Remove more instances of futures01 (#4633)

* Start removing last few instances of futures01

* Use to_poll on wasm

* Revert "Use to_poll on wasm"

This reverts commit 1c61728f10d520df5f9b28c415a0db68e478b9c7.

* Fix fg test

* Upgrade network test futures

* Update offchain hyper version

* Update service test

* bump tokio to 0.2.10

* Removed some unneeded tokios

* fixes

* fix run_until_all_full

* Make service test debuggable

* Update client/offchain/src/api/http.rs

Co-Authored-By: Demi Obenour <48690212+DemiMarie-parity@users.noreply.github.com>

* Add service_test to test-int output

* nitpicking

* Finally fix test

* Give up and revert client/serviec/test

* Revert gitlab ci too

Co-authored-by: Demi Obenour <demi@parity.io>
This commit is contained in:
Ashley
2020-02-28 17:02:33 +01:00
committed by GitHub
parent c1bf4702e2
commit 9a925faf7d
16 changed files with 366 additions and 435 deletions
@@ -40,7 +40,5 @@ sc-network = { version = "0.8.0-alpha.2", path = "../../network" }
sc-network-test = { version = "0.8.0-dev", path = "../../network/test" }
sc-service = { version = "0.8.0-alpha.2", path = "../../service" }
substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../../test-utils/runtime/client" }
tokio = "0.1.22"
env_logger = "0.7.0"
tempfile = "3.1.0"
futures01 = { package = "futures", version = "0.1" }
+14 -16
View File
@@ -829,10 +829,10 @@ mod tests {
use sp_runtime::traits::{Block as BlockT, DigestFor};
use sc_network::config::ProtocolConfig;
use parking_lot::Mutex;
use tokio::runtime::current_thread;
use sp_keyring::sr25519::Keyring;
use sc_client::BlockchainEvents;
use sp_consensus_aura::sr25519::AuthorityPair;
use std::task::Poll;
type Error = sp_blockchain::Error;
@@ -950,8 +950,8 @@ mod tests {
let net = Arc::new(Mutex::new(net));
let mut import_notifications = Vec::new();
let mut aura_futures = Vec::new();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut keystore_paths = Vec::new();
for (peer_id, key) in peers {
let mut net = net.lock();
@@ -979,7 +979,7 @@ mod tests {
&inherent_data_providers, slot_duration.get()
).expect("Registers aura inherent data provider");
let aura = start_aura::<_, _, _, _, _, AuthorityPair, _, _, _>(
aura_futures.push(start_aura::<_, _, _, _, _, AuthorityPair, _, _, _>(
slot_duration,
client.clone(),
select_chain,
@@ -990,21 +990,19 @@ mod tests {
false,
keystore,
sp_consensus::AlwaysCanAuthor,
)
.expect("Starts aura")
.unit_error()
.compat();
runtime.spawn(aura);
).expect("Starts aura"));
}
runtime.spawn(futures01::future::poll_fn(move || {
net.lock().poll();
Ok::<_, ()>(futures01::Async::NotReady::<()>)
}));
runtime.block_on(future::join_all(import_notifications)
.unit_error().compat()).unwrap();
futures::executor::block_on(future::select(
future::poll_fn(move |cx| {
net.lock().poll(cx);
Poll::<()>::Pending
}),
future::select(
future::join_all(aura_futures),
future::join_all(import_notifications)
)
));
}
#[test]
@@ -54,10 +54,8 @@ sc-network-test = { version = "0.8.0-dev", path = "../../network/test" }
sc-service = { version = "0.8.0-alpha.2", path = "../../service" }
substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../../test-utils/runtime/client" }
sc-block-builder = { version = "0.8.0-alpha.2", path = "../../block-builder" }
tokio = "0.1.22"
env_logger = "0.7.0"
tempfile = "3.1.0"
futures01 = { package = "futures", version = "0.1" }
[features]
test-helpers = []
+1 -1
View File
@@ -318,7 +318,7 @@ pub struct BabeParams<B: BlockT, C, E, I, SO, SC, CAW> {
pub can_author_with: CAW,
}
/// Start the babe worker. The returned future should be run in a tokio runtime.
/// Start the babe worker.
pub fn start_babe<B, C, SC, E, I, SO, CAW, Error>(BabeParams {
keystore,
client,
+17 -18
View File
@@ -32,10 +32,9 @@ use sc_network_test::*;
use sc_network_test::{Block as TestBlock, PeersClient};
use sc_network::config::{BoxFinalityProofRequestBuilder, ProtocolConfig};
use sp_runtime::{generic::DigestItem, traits::{Block as BlockT, DigestFor}};
use tokio::runtime::current_thread;
use sc_client_api::{BlockchainEvents, backend::TransactionFor};
use log::debug;
use std::{time::Duration, cell::RefCell};
use std::{time::Duration, cell::RefCell, task::Poll};
type Item = DigestItem<Hash>;
@@ -354,7 +353,7 @@ fn run_one_test(
let net = Arc::new(Mutex::new(net));
let mut import_notifications = Vec::new();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut babe_futures = Vec::new();
let mut keystore_paths = Vec::new();
for (peer_id, seed) in peers {
@@ -399,7 +398,7 @@ fn run_one_test(
);
runtime.spawn(start_babe(BabeParams {
babe_futures.push(start_babe(BabeParams {
block_import: data.block_import.lock().take().expect("import set up during init"),
select_chain,
client,
@@ -410,23 +409,23 @@ fn run_one_test(
babe_link: data.link.clone(),
keystore,
can_author_with: sp_consensus::AlwaysCanAuthor,
}).expect("Starts babe").unit_error().compat());
}).expect("Starts babe"));
}
runtime.spawn(futures01::future::poll_fn(move || {
let mut net = net.lock();
net.poll();
for p in net.peers() {
for (h, e) in p.failed_verifications() {
panic!("Verification failed for {:?}: {}", h, e);
futures::executor::block_on(future::select(
futures::future::poll_fn(move |cx| {
let mut net = net.lock();
net.poll(cx);
for p in net.peers() {
for (h, e) in p.failed_verifications() {
panic!("Verification failed for {:?}: {}", h, e);
}
}
}
Ok::<_, ()>(futures01::Async::NotReady::<()>)
}));
runtime.block_on(future::join_all(import_notifications)
.unit_error().compat()).unwrap();
Poll::<()>::Pending
}),
future::select(future::join_all(import_notifications), future::join_all(babe_futures))
));
}
#[test]
+1 -2
View File
@@ -48,7 +48,6 @@ substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-uti
sp-consensus-babe = { version = "0.8.0-alpha.2", path = "../../primitives/consensus/babe" }
sp-state-machine = { version = "0.8.0-alpha.2", path = "../../primitives/state-machine" }
env_logger = "0.7.0"
tokio = "0.1.22"
tokio = { version = "0.2", features = ["rt-core"] }
tempfile = "3.1.0"
sp-api = { version = "2.0.0-alpha.2", path = "../../primitives/api" }
futures01 = { package = "futures", version = "0.1.29" }
+59 -70
View File
@@ -25,7 +25,7 @@ use sc_network_test::{
use sc_network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder};
use parking_lot::Mutex;
use futures_timer::Delay;
use tokio::runtime::current_thread;
use tokio::runtime::{Runtime, Handle};
use sp_keyring::Ed25519Keyring;
use sc_client::LongestChain;
use sc_client_api::backend::TransactionFor;
@@ -47,8 +47,6 @@ use sp_runtime::generic::{BlockId, DigestItem};
use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
use futures01::Async;
use futures::compat::Future01CompatExt;
use authorities::AuthoritySet;
use finality_proof::{
@@ -370,27 +368,25 @@ fn create_keystore(authority: Ed25519Keyring) -> (KeyStorePtr, tempfile::TempDir
(keystore, keystore_path)
}
fn block_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<GrandpaTestNet>>, runtime: &mut current_thread::Runtime) {
let drive_to_completion = futures01::future::poll_fn(|| {
net.lock().poll(); Ok::<Async<()>, ()>(Async::NotReady)
fn block_until_complete(future: impl Future + Unpin, net: &Arc<Mutex<GrandpaTestNet>>, runtime: &mut Runtime) {
let drive_to_completion = futures::future::poll_fn(|cx| {
net.lock().poll(cx); Poll::<()>::Pending
});
runtime.block_on(
future::select(future, drive_to_completion.compat())
.map(|_| Ok::<(), ()>(()))
.compat()
).unwrap();
future::select(future, drive_to_completion)
);
}
// run the voters to completion. provide a closure to be invoked after
// the voters are spawned but before blocking on them.
fn run_to_completion_with<F>(
runtime: &mut current_thread::Runtime,
runtime: &mut Runtime,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[Ed25519Keyring],
with: F,
) -> u64 where
F: FnOnce(current_thread::Handle) -> Option<Pin<Box<dyn Future<Output = ()>>>>
F: FnOnce(Handle) -> Option<Pin<Box<dyn Future<Output = ()>>>>
{
use parking_lot::RwLock;
@@ -398,7 +394,7 @@ fn run_to_completion_with<F>(
let highest_finalized = Arc::new(RwLock::new(0));
if let Some(f) = (with)(runtime.handle()) {
if let Some(f) = (with)(runtime.handle().clone()) {
wait_for.push(f);
};
@@ -456,7 +452,7 @@ fn run_to_completion_with<F>(
assert_send(&voter);
runtime.spawn(voter.unit_error().compat());
runtime.spawn(voter);
}
// wait for all finalized on each.
@@ -468,7 +464,7 @@ fn run_to_completion_with<F>(
}
fn run_to_completion(
runtime: &mut current_thread::Runtime,
runtime: &mut Runtime,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[Ed25519Keyring]
@@ -497,13 +493,13 @@ fn add_forced_change(
#[test]
fn finalize_3_voters_no_observers() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 3);
net.peer(0).push_blocks(20, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
for i in 0..3 {
assert_eq!(net.peer(i).client().info().best_number, 20,
@@ -522,14 +518,14 @@ fn finalize_3_voters_no_observers() {
#[test]
fn finalize_3_voters_1_full_observer() {
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 4);
net.peer(0).push_blocks(20, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();
@@ -588,7 +584,7 @@ fn finalize_3_voters_1_full_observer() {
}
for voter in voters {
runtime.spawn(voter.unit_error().compat());
runtime.spawn(voter);
}
// wait for all finalized on each.
@@ -626,10 +622,10 @@ fn transition_3_voters_twice_1_full_observer() {
let api = TestApi::new(genesis_voters);
let net = Arc::new(Mutex::new(GrandpaTestNet::new(api, 8)));
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
net.lock().peer(0).push_blocks(1, false);
net.lock().block_until_sync(&mut runtime);
net.lock().block_until_sync();
for (i, peer) in net.lock().peers().iter().enumerate() {
let full_client = peer.client().as_full().expect("only full clients are used in test");
@@ -689,7 +685,7 @@ fn transition_3_voters_twice_1_full_observer() {
future::ready(())
});
runtime.spawn(block_production.unit_error().compat());
runtime.spawn(block_production);
}
let mut finality_notifications = Vec::new();
@@ -748,7 +744,7 @@ fn transition_3_voters_twice_1_full_observer() {
};
let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network");
runtime.spawn(voter.unit_error().compat());
runtime.spawn(voter);
}
// wait for all finalized on each.
@@ -759,14 +755,14 @@ fn transition_3_voters_twice_1_full_observer() {
#[test]
fn justification_is_emitted_when_consensus_data_changes() {
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 3);
// import block#1 WITH consensus data change
let new_authorities = vec![sp_consensus_babe::AuthorityId::from_slice(&[42; 32])];
net.peer(0).push_authorities_change_block(new_authorities);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 1, net.clone(), peers);
@@ -777,13 +773,13 @@ fn justification_is_emitted_when_consensus_data_changes() {
#[test]
fn justification_is_generated_periodically() {
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 3);
net.peer(0).push_blocks(32, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 32, net.clone(), peers);
@@ -816,7 +812,7 @@ fn consensus_changes_works() {
#[test]
fn sync_justifications_on_change_blocks() {
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let peers_b = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob];
let voters = make_ids(peers_b);
@@ -840,7 +836,7 @@ fn sync_justifications_on_change_blocks() {
// add more blocks on top of it (until we have 25)
net.peer(0).push_blocks(4, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
for i in 0..4 {
assert_eq!(net.peer(i).client().info().best_number, 25,
@@ -857,9 +853,9 @@ fn sync_justifications_on_change_blocks() {
}
// the last peer should get the justification by syncing from other peers
futures::executor::block_on(futures::future::poll_fn(move |_| {
futures::executor::block_on(futures::future::poll_fn(move |cx| {
if net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().poll();
net.lock().poll(cx);
Poll::Pending
} else {
Poll::Ready(())
@@ -870,7 +866,7 @@ fn sync_justifications_on_change_blocks() {
#[test]
fn finalizes_multiple_pending_changes_in_order() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers_a = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let peers_b = &[Ed25519Keyring::Dave, Ed25519Keyring::Eve, Ed25519Keyring::Ferdie];
@@ -915,7 +911,7 @@ fn finalizes_multiple_pending_changes_in_order() {
// add more blocks on top of it (until we have 30)
net.peer(0).push_blocks(4, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
// all peers imported both change blocks
for i in 0..6 {
@@ -930,7 +926,7 @@ fn finalizes_multiple_pending_changes_in_order() {
#[test]
fn force_change_to_new_set() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
// two of these guys are offline.
let genesis_authorities = &[
Ed25519Keyring::Alice,
@@ -965,7 +961,7 @@ fn force_change_to_new_set() {
});
net.lock().peer(0).push_blocks(25, false);
net.lock().block_until_sync(&mut runtime);
net.lock().block_until_sync();
for (i, peer) in net.lock().peers().iter().enumerate() {
assert_eq!(peer.client().info().best_number, 26,
@@ -1093,7 +1089,7 @@ fn voter_persists_its_votes() {
use futures::channel::mpsc;
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
// we have two authorities but we'll only be running the voter for alice
// we are going to be listening for the prevotes it casts
@@ -1103,7 +1099,7 @@ fn voter_persists_its_votes() {
// alice has a chain with 20 blocks
let mut net = GrandpaTestNet::new(TestApi::new(voters.clone()), 2);
net.peer(0).push_blocks(20, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
assert_eq!(net.peer(0).client().info().best_number, 20,
"Peer #{} failed to sync", 0);
@@ -1201,7 +1197,7 @@ fn voter_persists_its_votes() {
net: net.clone(),
client: client.clone(),
keystore,
}.unit_error().compat());
});
}
let (exit_tx, exit_rx) = futures::channel::oneshot::channel::<()>();
@@ -1246,10 +1242,7 @@ fn voter_persists_its_votes() {
HasVoted::No,
);
runtime.spawn(
network.map_err(|e| panic!("network bridge should not error: {:?}", e))
.compat(),
);
runtime.spawn(network);
let round_tx = Arc::new(Mutex::new(round_tx));
let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));
@@ -1341,7 +1334,7 @@ fn voter_persists_its_votes() {
panic!()
}
}
}).map(Ok).boxed().compat());
}));
}
block_until_complete(exit_rx.into_future(), &net, &mut runtime);
@@ -1350,13 +1343,13 @@ fn voter_persists_its_votes() {
#[test]
fn finalize_3_voters_1_light_observer() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let authorities = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob, Ed25519Keyring::Charlie];
let voters = make_ids(authorities);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 4);
net.peer(0).push_blocks(20, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
for i in 0..4 {
assert_eq!(net.peer(i).client().info().best_number, 20,
@@ -1386,8 +1379,8 @@ fn finalize_3_voters_1_light_observer() {
link,
net.lock().peers[3].network_service().clone(),
Exit,
).unwrap().unit_error().compat()
).unwrap();
).unwrap()
);
Some(Box::pin(finality_notifications.map(|_| ())))
});
@@ -1396,7 +1389,7 @@ fn finalize_3_voters_1_light_observer() {
#[test]
fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers = &[Ed25519Keyring::Alice];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 1);
@@ -1407,18 +1400,17 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
net.peer(0).push_authorities_change_block(vec![sp_consensus_babe::AuthorityId::from_slice(&[42; 32])]);
let net = Arc::new(Mutex::new(net));
run_to_completion(&mut runtime, 1, net.clone(), peers);
net.lock().block_until_sync(&mut runtime);
net.lock().block_until_sync();
// check that the block#1 is finalized on light client
let mut runtime = current_thread::Runtime::new().unwrap();
let _ = runtime.block_on(futures::future::poll_fn(move |_| {
runtime.block_on(futures::future::poll_fn(move |cx| {
if net.lock().peer(1).client().info().finalized_number == 1 {
Poll::Ready(())
} else {
net.lock().poll();
net.lock().poll(cx);
Poll::Pending
}
}).unit_error().compat());
}));
}
#[test]
@@ -1427,7 +1419,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
const FORCE_CHANGE: bool = true;
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
// two of these guys are offline.
let genesis_authorities = if FORCE_CHANGE {
@@ -1472,14 +1464,14 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
vec![sp_consensus_babe::AuthorityId::from_slice(&[42; 32])]
); // #10
net.lock().peer(0).push_blocks(1, false); // best is #11
net.lock().block_until_sync(&mut runtime);
net.lock().block_until_sync();
// finalize block #11 on full clients
run_to_completion(&mut runtime, 11, net.clone(), peers_a);
// request finalization by light client
net.lock().add_light_peer(&GrandpaTestNet::default_config());
net.lock().block_until_sync(&mut runtime);
net.lock().block_until_sync();
// check block, finalized on light client
assert_eq!(
@@ -1491,14 +1483,14 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
#[test]
fn voter_catches_up_to_latest_round_when_behind() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut runtime = Runtime::new().unwrap();
let peers = &[Ed25519Keyring::Alice, Ed25519Keyring::Bob];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 3);
net.peer(0).push_blocks(50, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();
@@ -1548,19 +1540,18 @@ fn voter_catches_up_to_latest_round_when_behind() {
let voter = voter(Some(keystore), peer_id, link, net.clone());
runtime.spawn(voter.unit_error().compat());
runtime.spawn(voter);
}
// wait for them to finalize block 50. since they'll vote on 3/4 of the
// unfinalized chain it will take at least 4 rounds to do it.
let wait_for_finality = ::futures::future::join_all(finality_notifications)
.map(|_| ());
let wait_for_finality = ::futures::future::join_all(finality_notifications);
// spawn a new voter, it should be behind by at least 4 rounds and should be
// able to catch up to the latest round
let test = {
let net = net.clone();
let runtime = runtime.handle();
let runtime = runtime.handle().clone();
wait_for_finality.then(move |_| {
let peer_id = 2;
@@ -1574,7 +1565,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
let voter = voter(None, peer_id, link, net);
runtime.spawn(voter.unit_error().compat()).unwrap();
runtime.spawn(voter);
let start_time = std::time::Instant::now();
let timeout = Duration::from_secs(5 * 60);
@@ -1595,14 +1586,12 @@ fn voter_catches_up_to_latest_round_when_behind() {
})
};
let drive_to_completion = futures01::future::poll_fn(|| {
net.lock().poll(); Ok::<Async<()>, ()>(Async::NotReady)
let drive_to_completion = futures::future::poll_fn(|cx| {
net.lock().poll(cx); Poll::<()>::Pending
});
runtime.block_on(
future::select(test, drive_to_completion.compat())
.map(|_| Ok::<(), ()>(()))
.compat()
).unwrap();
future::select(test, drive_to_completion)
);
}
#[test]
+1 -3
View File
@@ -13,8 +13,7 @@ repository = "https://github.com/paritytech/substrate/"
sc-network = { version = "0.8.0-alpha.2", path = "../" }
log = "0.4.8"
parking_lot = "0.10.0"
futures = "0.1.29"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
futures-timer = "3.0.1"
rand = "0.7.2"
libp2p = { version = "0.16.1", default-features = false, features = ["libp2p-websocket"] }
@@ -30,4 +29,3 @@ env_logger = "0.7.0"
substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../../test-utils/runtime/client" }
substrate-test-runtime = { version = "2.0.0-dev", path = "../../../test-utils/runtime" }
tempfile = "3.1.0"
tokio = "0.1.22"
+29 -34
View File
@@ -21,7 +21,7 @@ mod block_import;
#[cfg(test)]
mod sync;
use std::{collections::HashMap, pin::Pin, sync::Arc, marker::PhantomData};
use std::{collections::HashMap, pin::Pin, sync::Arc, marker::PhantomData, task::{Poll, Context as FutureContext}};
use libp2p::build_multiaddr;
use log::trace;
@@ -46,7 +46,6 @@ use sp_consensus::block_import::{BlockImport, ImportResult};
use sp_consensus::Error as ConsensusError;
use sp_consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
use futures::prelude::*;
use futures03::{Future as _, FutureExt as _, TryFutureExt as _, StreamExt as _, TryStreamExt as _};
use sc_network::{NetworkWorker, NetworkStateInfo, NetworkService, ReportHandle, config::ProtocolId};
use sc_network::config::{NetworkConfiguration, TransportConfig, BoxFinalityProofRequestBuilder};
use libp2p::PeerId;
@@ -187,8 +186,8 @@ pub struct Peer<D> {
select_chain: Option<LongestChain<substrate_test_runtime_client::Backend, Block>>,
backend: Option<Arc<substrate_test_runtime_client::Backend>>,
network: NetworkWorker<Block, <Block as BlockT>::Hash>,
imported_blocks_stream: Box<dyn Stream<Item = BlockImportNotification<Block>, Error = ()> + Send>,
finality_notification_stream: Box<dyn Stream<Item = FinalityNotification<Block>, Error = ()> + Send>,
imported_blocks_stream: Pin<Box<dyn Stream<Item = BlockImportNotification<Block>> + Send>>,
finality_notification_stream: Pin<Box<dyn Stream<Item = FinalityNotification<Block>> + Send>>,
}
impl<D> Peer<D> {
@@ -649,10 +648,8 @@ pub trait TestNetFactory: Sized {
peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone());
}
let imported_blocks_stream = Box::new(client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
let finality_notification_stream = Box::new(client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
let finality_notification_stream = Box::pin(client.finality_notification_stream().fuse());
peers.push(Peer {
data,
@@ -724,10 +721,8 @@ pub trait TestNetFactory: Sized {
peer.network.add_known_address(network.service().local_peer_id(), listen_addr.clone());
}
let imported_blocks_stream = Box::new(client.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
let finality_notification_stream = Box::new(client.finality_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat().fuse());
let imported_blocks_stream = Box::pin(client.import_notification_stream().fuse());
let finality_notification_stream = Box::pin(client.finality_notification_stream().fuse());
peers.push(Peer {
data,
@@ -746,70 +741,70 @@ pub trait TestNetFactory: Sized {
/// Polls the testnet until all nodes are in sync.
///
/// Must be executed in a task context.
fn poll_until_sync(&mut self) -> Async<()> {
self.poll();
fn poll_until_sync(&mut self, cx: &mut FutureContext) -> Poll<()> {
self.poll(cx);
// Return `NotReady` if there's a mismatch in the highest block number.
let mut highest = None;
for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Async::NotReady
return Poll::Pending
}
if peer.network.num_sync_requests() != 0 {
return Async::NotReady
return Poll::Pending
}
match (highest, peer.client.info().best_hash) {
(None, b) => highest = Some(b),
(Some(ref a), ref b) if a == b => {},
(Some(_), _) => return Async::NotReady,
(Some(_), _) => return Poll::Pending
}
}
Async::Ready(())
Poll::Ready(())
}
/// Polls the testnet until theres' no activiy of any kind.
///
/// Must be executed in a task context.
fn poll_until_idle(&mut self) -> Async<()> {
self.poll();
fn poll_until_idle(&mut self, cx: &mut FutureContext) -> Poll<()> {
self.poll(cx);
for peer in self.peers().iter() {
if peer.is_major_syncing() || peer.network.num_queued_blocks() != 0 {
return Async::NotReady
return Poll::Pending
}
if peer.network.num_sync_requests() != 0 {
return Async::NotReady
return Poll::Pending
}
}
Async::Ready(())
Poll::Ready(())
}
/// Blocks the current thread until we are sync'ed.
///
/// Calls `poll_until_sync` repeatedly with the runtime passed as parameter.
fn block_until_sync(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_sync()))).unwrap();
/// Calls `poll_until_sync` repeatedly.
fn block_until_sync(&mut self) {
futures::executor::block_on(futures::future::poll_fn::<(), _>(|cx| self.poll_until_sync(cx)));
}
/// Blocks the current thread until there are no pending packets.
///
/// Calls `poll_until_idle` repeatedly with the runtime passed as parameter.
fn block_until_idle(&mut self, runtime: &mut tokio::runtime::current_thread::Runtime) {
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| Ok(self.poll_until_idle()))).unwrap();
fn block_until_idle(&mut self) {
futures::executor::block_on(futures::future::poll_fn::<(), _>(|cx| self.poll_until_idle(cx)));
}
/// Polls the testnet. Processes all the pending actions and returns `NotReady`.
fn poll(&mut self) {
fn poll(&mut self, cx: &mut FutureContext) {
self.mut_peers(|peers| {
for peer in peers {
trace!(target: "sync", "-- Polling {}", peer.id());
futures03::future::poll_fn(|cx| Pin::new(&mut peer.network).poll(cx))
.map(|item| Ok::<_, ()>(item))
.compat().poll().unwrap();
if let Poll::Ready(res) = Pin::new(&mut peer.network).poll(cx) {
res.unwrap();
}
trace!(target: "sync", "-- Polling complete {}", peer.id());
// We poll `imported_blocks_stream`.
while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() {
while let Poll::Ready(Some(notification)) = peer.imported_blocks_stream.as_mut().poll_next(cx) {
peer.network.on_block_imported(
notification.header,
Vec::new(),
@@ -819,7 +814,7 @@ pub trait TestNetFactory: Sized {
// We poll `finality_notification_stream`, but we only take the last event.
let mut last = None;
while let Ok(Async::Ready(Some(item))) = peer.finality_notification_stream.poll() {
while let Poll::Ready(Some(item)) = peer.finality_notification_stream.as_mut().poll_next(cx) {
last = Some(item);
}
if let Some(notification) = last {
+127 -151
View File
@@ -16,14 +16,12 @@
use sc_network::config::Roles;
use sp_consensus::BlockOrigin;
use futures03::TryFutureExt as _;
use std::time::Duration;
use tokio::runtime::current_thread;
use futures::executor::block_on;
use super::*;
fn test_ancestor_search_when_common_is(n: usize) {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(n, false);
@@ -34,7 +32,7 @@ fn test_ancestor_search_when_common_is(n: usize) {
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
@@ -42,24 +40,22 @@ fn test_ancestor_search_when_common_is(n: usize) {
#[test]
fn sync_peers_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
for peer in 0..3 {
if net.peer(peer).num_peers() != 2 {
return Ok(Async::NotReady)
return Poll::Pending
}
}
Ok(Async::Ready(()))
})).unwrap();
Poll::Ready(())
}));
}
#[test]
fn sync_cycle_from_offline_to_syncing_to_offline() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
for peer in 0..3 {
// Offline, and not major syncing.
@@ -71,51 +67,50 @@ fn sync_cycle_from_offline_to_syncing_to_offline() {
net.peer(2).push_blocks(100, false);
// Block until all nodes are online and nodes 0 and 1 and major syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
for peer in 0..3 {
// Online
if net.peer(peer).is_offline() {
return Ok(Async::NotReady)
return Poll::Pending
}
if peer < 2 {
// Major syncing.
if net.peer(peer).blocks_count() < 100 && !net.peer(peer).is_major_syncing() {
return Ok(Async::NotReady)
return Poll::Pending
}
}
}
Ok(Async::Ready(()))
})).unwrap();
Poll::Ready(())
}));
// Block until all nodes are done syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
for peer in 0..3 {
if net.peer(peer).is_major_syncing() {
return Ok(Async::NotReady)
return Poll::Pending
}
}
Ok(Async::Ready(()))
})).unwrap();
Poll::Ready(())
}));
// Now drop nodes 1 and 2, and check that node 0 is offline.
net.peers.remove(2);
net.peers.remove(1);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if !net.peer(0).is_offline() {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap();
}));
}
#[test]
fn syncing_node_not_major_syncing_when_disconnected() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
// Generate blocks.
@@ -125,36 +120,35 @@ fn syncing_node_not_major_syncing_when_disconnected() {
assert!(!net.peer(1).is_major_syncing());
// Check that we switch to major syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if !net.peer(1).is_major_syncing() {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap();
}));
// Destroy two nodes, and check that we switch to non-major syncing.
net.peers.remove(2);
net.peers.remove(0);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(0).is_major_syncing() {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap();
}));
}
#[test]
fn sync_from_two_peers_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
assert!(!net.peer(0).is_major_syncing());
@@ -163,12 +157,11 @@ fn sync_from_two_peers_works() {
#[test]
fn sync_from_two_peers_with_ancestry_search_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
@@ -176,14 +169,13 @@ fn sync_from_two_peers_with_ancestry_search_works() {
#[test]
fn ancestry_search_works_when_backoff_is_one() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(1, false);
net.peer(1).push_blocks(2, false);
net.peer(2).push_blocks(2, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
@@ -191,14 +183,13 @@ fn ancestry_search_works_when_backoff_is_one() {
#[test]
fn ancestry_search_works_when_ancestor_is_genesis() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(13, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
@@ -221,10 +212,9 @@ fn ancestry_search_works_when_common_is_hundred() {
#[test]
fn sync_long_chain_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(1).push_blocks(500, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
}
@@ -232,18 +222,17 @@ fn sync_long_chain_works() {
#[test]
fn sync_no_common_longer_chain_fails() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(20, true);
net.peer(1).push_blocks(20, false);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(0).is_major_syncing() {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap();
}));
let peer1 = &net.peers()[1];
assert!(!net.peers()[0].blockchain_canon_equals(peer1));
}
@@ -251,10 +240,9 @@ fn sync_no_common_longer_chain_fails() {
#[test]
fn sync_justifications() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = JustificationTestNet::new(3);
net.peer(0).push_blocks(20, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
// there's currently no justification for block #10
assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None);
@@ -274,26 +262,25 @@ fn sync_justifications() {
net.peer(1).request_justification(&h2.hash().into(), 15);
net.peer(1).request_justification(&h3.hash().into(), 20);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
for height in (10..21).step_by(5) {
if net.peer(0).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) {
return Ok(Async::NotReady);
return Poll::Pending;
}
if net.peer(1).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) {
return Ok(Async::NotReady);
return Poll::Pending;
}
}
Ok(Async::Ready(()))
})).unwrap();
Poll::Ready(())
}));
}
#[test]
fn sync_justifications_across_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = JustificationTestNet::new(3);
// we push 5 blocks
net.peer(0).push_blocks(5, false);
@@ -303,30 +290,29 @@ fn sync_justifications_across_forks() {
// peer 1 will only see the longer fork. but we'll request justifications
// for both and finalize the small fork instead.
net.block_until_sync(&mut runtime);
net.block_until_sync();
net.peer(0).client().finalize_block(BlockId::Hash(f1_best), Some(Vec::new()), true).unwrap();
net.peer(1).request_justification(&f1_best, 10);
net.peer(1).request_justification(&f2_best, 11);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(0).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new()) &&
net.peer(1).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new())
{
Ok(Async::Ready(()))
Poll::Ready(())
} else {
Ok(Async::NotReady)
Poll::Pending
}
})).unwrap();
}));
}
#[test]
fn sync_after_fork_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
@@ -340,7 +326,7 @@ fn sync_after_fork_works() {
net.peer(2).push_blocks(1, false);
// peer 1 has the best chain
net.block_until_sync(&mut runtime);
net.block_until_sync();
let peer1 = &net.peers()[1];
assert!(net.peers()[0].blockchain_canon_equals(peer1));
(net.peers()[1].blockchain_canon_equals(peer1));
@@ -350,7 +336,6 @@ fn sync_after_fork_works() {
#[test]
fn syncs_all_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(4);
net.peer(0).push_blocks(2, false);
net.peer(1).push_blocks(2, false);
@@ -358,7 +343,7 @@ fn syncs_all_forks() {
net.peer(0).push_blocks(2, true);
net.peer(1).push_blocks(4, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
// Check that all peers have all of the blocks.
assert_eq!(9, net.peer(0).blocks_count());
assert_eq!(9, net.peer(1).blocks_count());
@@ -367,12 +352,11 @@ fn syncs_all_forks() {
#[test]
fn own_blocks_are_announced() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.block_until_sync(&mut runtime); // connect'em
net.block_until_sync(); // connect'em
net.peer(0).generate_blocks(1, BlockOrigin::Own, |builder| builder.build().unwrap().block);
net.block_until_sync(&mut runtime);
net.block_until_sync();
assert_eq!(net.peer(0).client.info().best_number, 1);
assert_eq!(net.peer(1).client.info().best_number, 1);
@@ -384,7 +368,6 @@ fn own_blocks_are_announced() {
#[test]
fn blocks_are_not_announced_by_light_nodes() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(0);
// full peer0 is connected to light peer
@@ -397,7 +380,7 @@ fn blocks_are_not_announced_by_light_nodes() {
// Sync between 0 and 1.
net.peer(0).push_blocks(1, false);
assert_eq!(net.peer(0).client.info().best_number, 1);
net.block_until_sync(&mut runtime);
net.block_until_sync();
assert_eq!(net.peer(1).client.info().best_number, 1);
// Add another node and remove node 0.
@@ -405,18 +388,17 @@ fn blocks_are_not_announced_by_light_nodes() {
net.peers.remove(0);
// Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together.
let mut delay = futures_timer::Delay::new(Duration::from_secs(5)).unit_error().compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
delay.poll().map_err(|_| ())
})).unwrap();
let mut delay = futures_timer::Delay::new(Duration::from_secs(5));
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
Pin::new(&mut delay).poll(cx)
}));
assert_eq!(net.peer(1).client.info().best_number, 0);
}
#[test]
fn can_sync_small_non_best_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
@@ -435,14 +417,14 @@ fn can_sync_small_non_best_forks() {
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());
// poll until the two nodes connect, otherwise announcing the block will not work
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(0).num_peers() == 0 {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap();
}));
// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.
@@ -455,32 +437,31 @@ fn can_sync_small_non_best_forks() {
// after announcing, peer 1 downloads the block.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
return Poll::Pending
}
Ok(Async::Ready(()))
})).unwrap();
net.block_until_sync(&mut runtime);
Poll::Ready(())
}));
net.block_until_sync();
let another_fork = net.peer(0).push_blocks_at(BlockId::Number(35), 2, true);
net.peer(0).announce_block(another_fork, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client().header(&BlockId::Hash(another_fork)).unwrap().is_none() {
return Ok(Async::NotReady)
return Poll::Pending
}
Ok(Async::Ready(()))
})).unwrap();
Poll::Ready(())
}));
}
#[test]
fn can_not_sync_from_light_peer() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
// given the network with 1 full nodes (#0) and 1 light node (#1)
let mut net = TestNet::new(1);
@@ -490,7 +471,7 @@ fn can_not_sync_from_light_peer() {
net.peer(0).push_blocks(1, false);
// and let the light client sync from this node
net.block_until_sync(&mut runtime);
net.block_until_sync();
// ensure #0 && #1 have the same best block
let full0_info = net.peer(0).client.info();
@@ -504,29 +485,28 @@ fn can_not_sync_from_light_peer() {
net.peers.remove(0);
// ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds
let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5)).unit_error().compat();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
test_finished.poll().map_err(|_| ())
})).unwrap();
let mut test_finished = futures_timer::Delay::new(Duration::from_secs(5));
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
Pin::new(&mut test_finished).poll(cx)
}));
}
#[test]
fn light_peer_imports_header_from_announce() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
fn import_with_announce(net: &mut TestNet, runtime: &mut current_thread::Runtime, hash: H256) {
fn import_with_announce(net: &mut TestNet, hash: H256) {
net.peer(0).announce_block(hash, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some() {
Ok(Async::Ready(()))
Poll::Ready(())
} else {
Ok(Async::NotReady)
Poll::Pending
}
})).unwrap();
}));
}
// given the network with 1 full nodes (#0) and 1 light node (#1)
@@ -534,21 +514,20 @@ fn light_peer_imports_header_from_announce() {
net.add_light_peer(&Default::default());
// let them connect to each other
net.block_until_sync(&mut runtime);
net.block_until_sync();
// check that NEW block is imported from announce message
let new_hash = net.peer(0).push_blocks(1, false);
import_with_announce(&mut net, &mut runtime, new_hash);
import_with_announce(&mut net, new_hash);
// check that KNOWN STALE block is imported from announce message
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
import_with_announce(&mut net, &mut runtime, known_stale_hash);
import_with_announce(&mut net, known_stale_hash);
}
#[test]
fn can_sync_explicit_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
@@ -568,14 +547,14 @@ fn can_sync_explicit_forks() {
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());
// poll until the two nodes connect, otherwise announcing the block will not work
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap();
}));
// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.
@@ -589,21 +568,20 @@ fn can_sync_explicit_forks() {
net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number);
// peer 1 downloads the block.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
return Poll::Pending
}
Ok(Async::Ready(()))
})).unwrap();
Poll::Ready(())
}));
}
#[test]
fn syncs_header_only_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(0);
let config = ProtocolConfig::default();
net.add_full_peer_with_states(&config, None);
@@ -616,7 +594,7 @@ fn syncs_header_only_forks() {
let small_number = net.peer(0).client().info().best_number;
net.peer(1).push_blocks(4, false);
net.block_until_sync(&mut runtime);
net.block_until_sync();
// Peer 1 will sync the small fork even though common block state is missing
assert_eq!(9, net.peer(0).blocks_count());
assert_eq!(9, net.peer(1).blocks_count());
@@ -624,19 +602,18 @@ fn syncs_header_only_forks() {
// Request explicit header-only sync request for the ancient fork.
let first_peer_id = net.peer(0).id();
net.peer(1).set_sync_fork_request(vec![first_peer_id], small_hash, small_number);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
return Poll::Pending
}
Ok(Async::Ready(()))
})).unwrap();
Poll::Ready(())
}));
}
#[test]
fn does_not_sync_announced_old_best_block() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
let old_hash = net.peer(0).push_blocks(1, false);
@@ -645,19 +622,19 @@ fn does_not_sync_announced_old_best_block() {
net.peer(1).push_blocks(20, true);
net.peer(0).announce_block(old_hash, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
block_on(futures::future::poll_fn::<(), _>(|cx| {
// poll once to import announcement
net.poll();
Ok(Async::Ready(()))
})).unwrap();
net.poll(cx);
Poll::Ready(())
}));
assert!(!net.peer(1).is_major_syncing());
net.peer(0).announce_block(old_hash_with_parent, Vec::new());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
block_on(futures::future::poll_fn::<(), _>(|cx| {
// poll once to import announcement
net.poll();
Ok(Async::Ready(()))
})).unwrap();
net.poll(cx);
Poll::Ready(())
}));
assert!(!net.peer(1).is_major_syncing());
}
@@ -665,19 +642,18 @@ fn does_not_sync_announced_old_best_block() {
fn full_sync_requires_block_body() {
// Check that we don't sync headers-only in full mode.
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(0).push_headers(1);
// Wait for nodes to connect
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
block_on(futures::future::poll_fn::<(), _>(|cx| {
net.poll(cx);
if net.peer(0).num_peers() == 0 || net.peer(1).num_peers() == 0 {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(())
}
})).unwrap();
net.block_until_idle(&mut runtime);
}));
net.block_until_idle();
assert_eq!(net.peer(1).client.info().best_number, 0);
}
-1
View File
@@ -66,4 +66,3 @@ substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-util
sp-consensus-babe = { version = "0.8.0-alpha.2", path = "../../primitives/consensus/babe" }
grandpa = { version = "0.8.0-alpha.2", package = "sc-finality-grandpa", path = "../finality-grandpa" }
grandpa-primitives = { version = "2.0.0-alpha.2", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" }
tokio = { version = "0.2", features = ["rt-core"] }
+4 -2
View File
@@ -116,6 +116,8 @@ pub struct Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {
marker: PhantomData<TBl>,
}
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Unpin for Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> {}
/// Alias for a an implementation of `futures::future::Executor`.
pub type TaskExecutor = Arc<dyn Spawn + Send + Sync>;
@@ -229,7 +231,7 @@ impl<TBl, TBackend, TExec, TRtApi, TSc, TExPool, TOc> AbstractService for
Service<TBl, Client<TBackend, TExec, TBl, TRtApi>, TSc, NetworkStatus<TBl>,
NetworkService<TBl, TBl::Hash>, TExPool, TOc>
where
TBl: BlockT + Unpin,
TBl: BlockT,
TBackend: 'static + sc_client_api::backend::Backend<TBl>,
TExec: 'static + sc_client::CallExecutor<TBl> + Send + Sync + Clone,
TRtApi: 'static + Send + Sync,
@@ -328,7 +330,7 @@ where
}
}
impl<TBl: Unpin, TCl, TSc: Unpin, TNetStatus, TNet, TTxPool, TOc> Future for
impl<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc> Future for
Service<TBl, TCl, TSc, TNetStatus, TNet, TTxPool, TOc>
{
type Output = Result<(), Error>;
+9 -20
View File
@@ -122,28 +122,17 @@ mod tests {
let (tx, rx) = mpsc::unbounded();
status_sinks.push(Duration::from_millis(100), tx);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let mut val_order = 5;
runtime.spawn(futures::future::poll_fn(move |cx| {
status_sinks.poll(cx, || { val_order += 1; val_order });
Poll::<()>::Pending
}));
let done = rx
.into_future()
.then(|(item, rest)| {
assert_eq!(item, Some(6));
rest.into_future()
futures::executor::block_on(futures::future::select(
futures::future::poll_fn(move |cx| {
status_sinks.poll(cx, || { val_order += 1; val_order });
Poll::<()>::Pending
}),
Box::pin(async {
let items: Vec<i32> = rx.take(3).collect().await;
assert_eq!(items, [6, 7, 8]);
})
.then(|(item, rest)| {
assert_eq!(item, Some(7));
rest.into_future()
})
.map(|(item, _)| {
assert_eq!(item, Some(8));
});
runtime.block_on(done);
));
}
}