Rewrite the network TestNet (#3016)

* Add a memory-only option for the network

* Tests cleanup

* Make grandpa/aura/babe compile

* Aura and Babe tests now passing

* More work on tests rewrite

* Attempt to fix grandpa

* Make more grandpa tests pass

* More grandpa tests work

* Work on sync tests

* More work

* light_peer_imports_header_from_announce passes

* can_sync_small_non_best_forks passes

* syncing_node_not_major_syncing_when_disconnected passes

* blocks_are_not_announced_by_light_nodes passing

* All sync tests passing 🎉

* Some TestNet cleanup

* Work on grandpa tests

* More grandpa work

* GrandPa work

* Add check about block_import

* Remove the temporarily added Sync

* Fix network tests warnings

* voter_persists_its_votes passes

* Fix imports in network tests

* Fix service tests

* Call on_block_imported 🤷

* Add shortcut

* Finish using shortcut
This commit is contained in:
Pierre Krieger
2019-07-05 12:19:03 +02:00
committed by André Silva
parent abf33fe479
commit 22ec13cf65
12 changed files with 695 additions and 1403 deletions
+5 -2
View File
@@ -32,7 +32,7 @@ use service::{
};
use network::{
self, multiaddr::Protocol,
config::{NetworkConfiguration, NonReservedPeerMode, NodeKeyConfig},
config::{NetworkConfiguration, TransportConfig, NonReservedPeerMode, NodeKeyConfig},
build_multiaddr,
};
use primitives::H256;
@@ -354,7 +354,10 @@ fn fill_network_configuration(
config.in_peers = cli.in_peers;
config.out_peers = cli.out_peers;
config.enable_mdns = !is_dev && !cli.no_mdns;
config.transport = TransportConfig::Normal {
enable_mdns: !is_dev && !cli.no_mdns,
wasm_external_transport: None,
};
Ok(())
}
+6 -31
View File
@@ -705,7 +705,7 @@ pub fn import_queue<B, C, P>(
#[cfg(test)]
mod tests {
use super::*;
use futures::stream::Stream as _;
use futures::{Async, stream::Stream as _};
use consensus_common::NoNetwork as DummyOracle;
use network::test::*;
use network::test::{Block as TestBlock, PeersClient, PeersFullClient};
@@ -756,11 +756,9 @@ mod tests {
}
const SLOT_DURATION: u64 = 1;
const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50);
pub struct AuraTestNet {
peers: Vec<Arc<Peer<(), DummySpecialization>>>,
started: bool,
peers: Vec<Peer<(), DummySpecialization>>,
}
impl TestNetFactory for AuraTestNet {
@@ -772,7 +770,6 @@ mod tests {
fn from_config(_config: &ProtocolConfig) -> Self {
AuraTestNet {
peers: Vec::new(),
started: false,
}
}
@@ -800,38 +797,24 @@ mod tests {
}
}
fn uses_tokio(&self) -> bool {
true
}
fn peer(&self, i: usize) -> &Peer<Self::PeerData, DummySpecialization> {
&self.peers[i]
}
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData, DummySpecialization>>> {
fn peers(&self) -> &Vec<Peer<Self::PeerData, DummySpecialization>> {
&self.peers
}
fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<Self::PeerData, DummySpecialization>>>)>(&mut self, closure: F) {
fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, DummySpecialization>>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
fn started(&self) -> bool {
self.started
}
fn set_started(&mut self, new: bool) {
self.started = new;
}
}
#[test]
#[allow(deprecated)]
fn authoring_blocks() {
let _ = ::env_logger::try_init();
let mut net = AuraTestNet::new(3);
net.start();
let net = AuraTestNet::new(3);
let peers = &[
(0, Keyring::Alice),
@@ -884,15 +867,7 @@ mod tests {
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = ::tokio_timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
}
+6 -30
View File
@@ -880,7 +880,7 @@ mod tests {
use super::generic::DigestItem;
use client::BlockchainEvents;
use test_client;
use futures::stream::Stream;
use futures::{Async, stream::Stream as _};
use log::debug;
use std::time::Duration;
type Item = generic::DigestItem<Hash>;
@@ -919,11 +919,9 @@ mod tests {
}
const SLOT_DURATION: u64 = 1;
const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50);
pub struct BabeTestNet {
peers: Vec<Arc<Peer<(), DummySpecialization>>>,
started: bool,
peers: Vec<Peer<(), DummySpecialization>>,
}
impl TestNetFactory for BabeTestNet {
@@ -936,7 +934,6 @@ mod tests {
debug!(target: "babe", "Creating test network from config");
BabeTestNet {
peers: Vec::new(),
started: false,
}
}
@@ -963,34 +960,22 @@ mod tests {
})
}
fn uses_tokio(&self) -> bool {
true
}
fn peer(&self, i: usize) -> &Peer<Self::PeerData, DummySpecialization> {
trace!(target: "babe", "Retreiving a peer");
&self.peers[i]
}
fn peers(&self) -> &Vec<Arc<Peer<Self::PeerData, DummySpecialization>>> {
fn peers(&self) -> &Vec<Peer<Self::PeerData, DummySpecialization>> {
trace!(target: "babe", "Retreiving peers");
&self.peers
}
fn mut_peers<F: FnOnce(&mut Vec<Arc<Peer<Self::PeerData, DummySpecialization>>>)>(
fn mut_peers<F: FnOnce(&mut Vec<Peer<Self::PeerData, DummySpecialization>>)>(
&mut self,
closure: F,
) {
closure(&mut self.peers);
}
fn started(&self) -> bool {
self.started
}
fn set_started(&mut self, new: bool) {
self.started = new;
}
}
#[test]
@@ -1003,10 +988,9 @@ mod tests {
fn authoring_blocks() {
drop(env_logger::try_init());
debug!(target: "babe", "checkpoint 1");
let mut net = BabeTestNet::new(3);
let net = BabeTestNet::new(3);
debug!(target: "babe", "checkpoint 2");
net.start();
debug!(target: "babe", "checkpoint 3");
let peers = &[
@@ -1060,15 +1044,7 @@ mod tests {
.map(drop)
.map_err(drop);
let drive_to_completion = ::tokio_timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().sync_without_disconnects();
Ok(())
})
.map(drop)
.map_err(drop);
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(drop)).unwrap();
}
+161 -267
View File
@@ -20,7 +20,6 @@ use super::*;
use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, PeersClient};
use network::test::{PassThroughVerifier};
use network::config::{ProtocolConfig, Roles};
use network::consensus_gossip as network_gossip;
use parking_lot::Mutex;
use tokio::runtime::current_thread;
use keyring::ed25519::{Keyring as AuthorityKeyring};
@@ -44,7 +43,6 @@ use fg_primitives::AuthorityId;
use authorities::AuthoritySet;
use finality_proof::{FinalityProofProvider, AuthoritySetForFinalityProver, AuthoritySetForFinalityChecker};
use communication::GRANDPA_ENGINE_ID;
use consensus_changes::ConsensusChanges;
type PeerData =
@@ -62,16 +60,14 @@ type PeerData =
type GrandpaPeer = Peer<PeerData, DummySpecialization>;
struct GrandpaTestNet {
peers: Vec<Arc<GrandpaPeer>>,
peers: Vec<GrandpaPeer>,
test_config: TestApi,
started: bool,
}
impl GrandpaTestNet {
fn new(test_config: TestApi, n_peers: usize) -> Self {
let mut net = GrandpaTestNet {
peers: Vec::with_capacity(n_peers),
started: false,
test_config,
};
let config = Self::default_config();
@@ -92,7 +88,6 @@ impl TestNetFactory for GrandpaTestNet {
GrandpaTestNet {
peers: Vec::new(),
test_config: Default::default(),
started: false,
}
}
@@ -163,112 +158,17 @@ impl TestNetFactory for GrandpaTestNet {
}
}
fn uses_tokio(&self) -> bool {
true
}
fn peer(&self, i: usize) -> &GrandpaPeer {
&self.peers[i]
}
fn peers(&self) -> &Vec<Arc<GrandpaPeer>> {
fn peers(&self) -> &Vec<GrandpaPeer> {
&self.peers
}
fn mut_peers<F: FnOnce(&mut Vec<Arc<GrandpaPeer>>)>(&mut self, closure: F) {
fn mut_peers<F: FnOnce(&mut Vec<GrandpaPeer>)>(&mut self, closure: F) {
closure(&mut self.peers);
}
fn started(&self) -> bool {
self.started
}
fn set_started(&mut self, new: bool) {
self.started = new;
}
}
#[derive(Clone)]
struct MessageRouting {
inner: Arc<Mutex<GrandpaTestNet>>,
peer_id: usize,
}
impl MessageRouting {
fn new(inner: Arc<Mutex<GrandpaTestNet>>, peer_id: usize,) -> Self {
MessageRouting {
inner,
peer_id,
}
}
}
impl Network<Block> for MessageRouting {
type In = Box<dyn Stream<Item=network_gossip::TopicNotification, Error=()> + Send>;
/// Get a stream of messages for a specific gossip topic.
fn messages_for(&self, topic: Hash) -> Self::In {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
let messages = peer.consensus_gossip_messages_for(
GRANDPA_ENGINE_ID,
topic,
);
let messages = messages.map_err(
move |_| panic!("Messages for topic {} dropped too early", topic)
);
Box::new(messages)
}
fn register_validator(&self, v: Arc<dyn network_gossip::Validator<Block>>) {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
peer.with_gossip(move |gossip, context| {
gossip.register_validator(context, GRANDPA_ENGINE_ID, v);
});
}
fn gossip_message(&self, topic: Hash, data: Vec<u8>, force: bool) {
let inner = self.inner.lock();
inner.peer(self.peer_id).gossip_message(
topic,
GRANDPA_ENGINE_ID,
data,
force,
);
}
fn send_message(&self, who: Vec<network::PeerId>, data: Vec<u8>) {
let inner = self.inner.lock();
let peer = inner.peer(self.peer_id);
peer.with_gossip(move |gossip, ctx| for who in &who {
gossip.send_message(
ctx,
who,
network_gossip::ConsensusMessage {
engine_id: GRANDPA_ENGINE_ID,
data: data.clone(),
}
)
})
}
fn register_gossip_message(&self, _topic: Hash, _data: Vec<u8>) {
// NOTE: only required to restore previous state on startup
// not required for tests currently
}
fn report(&self, _who: network::PeerId, _cost_benefit: i32) {
}
fn announce(&self, _block: Hash) {
}
}
#[derive(Clone)]
@@ -440,7 +340,6 @@ impl AuthoritySetForFinalityChecker<Block> for TestApi {
}
const TEST_GOSSIP_DURATION: Duration = Duration::from_millis(500);
const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50);
fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(substrate_primitives::ed25519::Public, u64)> {
keys.iter()
@@ -452,6 +351,7 @@ fn make_ids(keys: &[AuthorityKeyring]) -> Vec<(substrate_primitives::ed25519::Pu
// run the voters to completion. provide a closure to be invoked after
// the voters are spawned but before blocking on them.
fn run_to_completion_with<F>(
runtime: &mut current_thread::Runtime,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[AuthorityKeyring],
@@ -462,7 +362,6 @@ fn run_to_completion_with<F>(
use parking_lot::RwLock;
let mut wait_for = Vec::new();
let mut runtime = current_thread::Runtime::new().unwrap();
let highest_finalized = Arc::new(RwLock::new(0));
@@ -472,12 +371,13 @@ fn run_to_completion_with<F>(
for (peer_id, key) in peers.iter().enumerate() {
let highest_finalized = highest_finalized.clone();
let (client, link) = {
let (client, net_service, link) = {
let net = net.lock();
// temporary needed for some reason
let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed");
(
net.peers[peer_id].client().clone(),
net.peers[peer_id].network_service().clone(),
link,
)
};
@@ -507,7 +407,7 @@ fn run_to_completion_with<F>(
name: Some(format!("peer#{}", peer_id)),
},
link: link,
network: MessageRouting::new(net.clone(), peer_id),
network: net_service,
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
@@ -524,35 +424,32 @@ fn run_to_completion_with<F>(
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
let highest_finalized = *highest_finalized.read();
highest_finalized
}
fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[AuthorityKeyring]) -> u64 {
run_to_completion_with(blocks, net, peers, |_| None)
fn run_to_completion(
runtime: &mut current_thread::Runtime,
blocks: u64,
net: Arc<Mutex<GrandpaTestNet>>,
peers: &[AuthorityKeyring]
) -> u64 {
run_to_completion_with(runtime, blocks, net, peers, |_| None)
}
#[test]
fn finalize_3_voters_no_observers() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 3);
net.peer(0).push_blocks(20, false);
net.sync();
net.block_until_sync(&mut runtime);
for i in 0..3 {
assert_eq!(net.peer(i).client().info().chain.best_number, 20,
@@ -560,7 +457,7 @@ fn finalize_3_voters_no_observers() {
}
let net = Arc::new(Mutex::new(net));
run_to_completion(20, net.clone(), peers);
run_to_completion(&mut runtime, 20, net.clone(), peers);
// normally there's no justification for finalized blocks
assert!(net.lock().peer(0).client().justification(&BlockId::Number(20)).unwrap().is_none(),
@@ -569,28 +466,30 @@ fn finalize_3_voters_no_observers() {
#[test]
fn finalize_3_voters_1_full_observer() {
let mut runtime = current_thread::Runtime::new().unwrap();
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 4);
net.peer(0).push_blocks(20, false);
net.sync();
net.block_until_sync(&mut runtime);
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();
let mut runtime = current_thread::Runtime::new().unwrap();
let all_peers = peers.iter()
.cloned()
.map(|key| Some(Arc::new(key.into())))
.chain(::std::iter::once(None));
for (peer_id, local_key) in all_peers.enumerate() {
let (client, link) = {
let (client, net_service, link) = {
let net = net.lock();
let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed");
(
net.peers[peer_id].client().clone(),
net.peers[peer_id].network_service().clone(),
link,
)
};
@@ -608,7 +507,7 @@ fn finalize_3_voters_1_full_observer() {
name: Some(format!("peer#{}", peer_id)),
},
link: link,
network: MessageRouting::new(net.clone(), peer_id),
network: net_service,
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
@@ -623,11 +522,7 @@ fn finalize_3_voters_1_full_observer() {
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { net.lock().sync_without_disconnects(); Ok(()) })
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
}
@@ -663,7 +558,7 @@ fn transition_3_voters_twice_1_full_observer() {
let mut runtime = current_thread::Runtime::new().unwrap();
net.lock().peer(0).push_blocks(1, false);
net.lock().sync();
net.lock().block_until_sync(&mut runtime);
for (i, peer) in net.lock().peers().iter().enumerate() {
let full_client = peer.client().as_full().expect("only full clients are used in test");
@@ -745,11 +640,12 @@ fn transition_3_voters_twice_1_full_observer() {
.enumerate();
for (peer_id, local_key) in all_peers {
let (client, link) = {
let (client, net_service, link) = {
let net = net.lock();
let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed");
(
net.peers[peer_id].client().clone(),
net.peers[peer_id].network_service().clone(),
link,
)
};
@@ -777,7 +673,7 @@ fn transition_3_voters_twice_1_full_observer() {
name: Some(format!("peer#{}", peer_id)),
},
link: link,
network: MessageRouting::new(net.clone(), peer_id),
network: net_service,
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
@@ -792,30 +688,22 @@ fn transition_3_voters_twice_1_full_observer() {
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap();
}
#[test]
fn justification_is_emitted_when_consensus_data_changes() {
let mut runtime = current_thread::Runtime::new().unwrap();
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 3);
// import block#1 WITH consensus data change
let new_authorities = vec![substrate_primitives::sr25519::Public::from_raw([42; 32])];
net.peer(0).push_authorities_change_block(new_authorities);
net.sync();
net.block_until_sync(&mut runtime);
let net = Arc::new(Mutex::new(net));
run_to_completion(1, net.clone(), peers);
run_to_completion(&mut runtime, 1, net.clone(), peers);
// ... and check that there's justification for block#1
assert!(net.lock().peer(0).client().justification(&BlockId::Number(1)).unwrap().is_some(),
@@ -824,15 +712,16 @@ fn justification_is_emitted_when_consensus_data_changes() {
#[test]
fn justification_is_generated_periodically() {
let mut runtime = current_thread::Runtime::new().unwrap();
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 3);
net.peer(0).push_blocks(32, false);
net.sync();
net.block_until_sync(&mut runtime);
let net = Arc::new(Mutex::new(net));
run_to_completion(32, net.clone(), peers);
run_to_completion(&mut runtime, 32, net.clone(), peers);
// when block#32 (justification_period) is finalized, justification
// is required => generated
@@ -862,6 +751,7 @@ fn consensus_changes_works() {
#[test]
fn sync_justifications_on_change_blocks() {
let mut runtime = current_thread::Runtime::new().unwrap();
let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let peers_b = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob];
let voters = make_ids(peers_b);
@@ -886,7 +776,7 @@ fn sync_justifications_on_change_blocks() {
// add more blocks on top of it (until we have 25)
net.peer(0).push_blocks(4, false);
net.sync();
net.block_until_sync(&mut runtime);
for i in 0..4 {
assert_eq!(net.peer(i).client().info().chain.best_number, 25,
@@ -894,7 +784,7 @@ fn sync_justifications_on_change_blocks() {
}
let net = Arc::new(Mutex::new(net));
run_to_completion(25, net.clone(), peers_a);
run_to_completion(&mut runtime, 25, net.clone(), peers_a);
// the first 3 peers are grandpa voters and therefore have already finalized
// block 21 and stored a justification
@@ -903,14 +793,20 @@ fn sync_justifications_on_change_blocks() {
}
// the last peer should get the justification by syncing from other peers
while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().sync_without_disconnects();
}
runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> {
if net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() {
net.lock().poll();
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap()
}
#[test]
fn finalizes_multiple_pending_changes_in_order() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let peers_b = &[AuthorityKeyring::Dave, AuthorityKeyring::Eve, AuthorityKeyring::Ferdie];
@@ -956,7 +852,7 @@ fn finalizes_multiple_pending_changes_in_order() {
// add more blocks on top of it (until we have 30)
net.peer(0).push_blocks(4, false);
net.sync();
net.block_until_sync(&mut runtime);
// all peers imported both change blocks
for i in 0..6 {
@@ -965,11 +861,12 @@ fn finalizes_multiple_pending_changes_in_order() {
}
let net = Arc::new(Mutex::new(net));
run_to_completion(30, net.clone(), all_peers);
run_to_completion(&mut runtime, 30, net.clone(), all_peers);
}
#[test]
fn doesnt_vote_on_the_tip_of_the_chain() {
let mut runtime = current_thread::Runtime::new().unwrap();
let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let voters = make_ids(peers_a);
let api = TestApi::new(voters);
@@ -977,7 +874,7 @@ fn doesnt_vote_on_the_tip_of_the_chain() {
// add 100 blocks
net.peer(0).push_blocks(100, false);
net.sync();
net.block_until_sync(&mut runtime);
for i in 0..3 {
assert_eq!(net.peer(i).client().info().chain.best_number, 100,
@@ -985,7 +882,7 @@ fn doesnt_vote_on_the_tip_of_the_chain() {
}
let net = Arc::new(Mutex::new(net));
let highest = run_to_completion(75, net.clone(), peers_a);
let highest = run_to_completion(&mut runtime, 75, net.clone(), peers_a);
// the highest block to be finalized will be 3/4 deep in the unfinalized chain
assert_eq!(highest, 75);
@@ -993,6 +890,8 @@ fn doesnt_vote_on_the_tip_of_the_chain() {
#[test]
fn force_change_to_new_set() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
// two of these guys are offline.
let genesis_authorities = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie, AuthorityKeyring::One, AuthorityKeyring::Two];
let peers_a = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
@@ -1004,49 +903,44 @@ fn force_change_to_new_set() {
let net = GrandpaTestNet::new(api, 3);
let net = Arc::new(Mutex::new(net));
let runner_net = net.clone();
let add_blocks = move |_| {
net.lock().peer(0).push_blocks(1, false);
net.lock().peer(0).push_blocks(1, false);
{
// add a forced transition at block 12.
let parent_hash = net.lock().peer(0).client().info().chain.best_hash;
forced_transitions.lock().insert(parent_hash, (0, ScheduledChange {
next_authorities: voters.clone(),
delay: 10,
}));
{
// add a forced transition at block 12.
let parent_hash = net.lock().peer(0).client().info().chain.best_hash;
forced_transitions.lock().insert(parent_hash, (0, ScheduledChange {
next_authorities: voters.clone(),
delay: 10,
}));
// add a normal transition too to ensure that forced changes take priority.
normal_transitions.lock().insert(parent_hash, ScheduledChange {
next_authorities: make_ids(genesis_authorities),
delay: 5,
});
}
// add a normal transition too to ensure that forced changes take priority.
normal_transitions.lock().insert(parent_hash, ScheduledChange {
next_authorities: make_ids(genesis_authorities),
delay: 5,
});
}
net.lock().peer(0).push_blocks(25, false);
net.lock().sync();
net.lock().peer(0).push_blocks(25, false);
net.lock().block_until_sync(&mut runtime);
for (i, peer) in net.lock().peers().iter().enumerate() {
assert_eq!(peer.client().info().chain.best_number, 26,
"Peer #{} failed to sync", i);
for (i, peer) in net.lock().peers().iter().enumerate() {
assert_eq!(peer.client().info().chain.best_number, 26,
"Peer #{} failed to sync", i);
let full_client = peer.client().as_full().expect("only full clients are used in test");
let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities(
#[allow(deprecated)]
&**full_client.backend()
).unwrap();
let full_client = peer.client().as_full().expect("only full clients are used in test");
let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities(
#[allow(deprecated)]
&**full_client.backend()
).unwrap();
assert_eq!(set.current(), (1, voters.as_slice()));
assert_eq!(set.pending_changes().count(), 0);
}
None
};
assert_eq!(set.current(), (1, voters.as_slice()));
assert_eq!(set.pending_changes().count(), 0);
}
// it will only finalize if the forced transition happens.
// we add_blocks after the voters are spawned because otherwise
// the link-halfs have the wrong AuthoritySet
run_to_completion_with(25, runner_net, peers_a, add_blocks);
run_to_completion(&mut runtime, 25, net, peers_a);
}
#[test]
@@ -1155,6 +1049,7 @@ fn voter_persists_its_votes() {
use futures::sync::mpsc;
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
// we have two authorities but we'll only be running the voter for alice
// we are going to be listening for the prevotes it casts
@@ -1164,13 +1059,11 @@ fn voter_persists_its_votes() {
// alice has a chain with 20 blocks
let mut net = GrandpaTestNet::new(TestApi::new(voters.clone()), 2);
net.peer(0).push_blocks(20, false);
net.sync();
net.block_until_sync(&mut runtime);
assert_eq!(net.peer(0).client().info().chain.best_number, 20,
"Peer #{} failed to sync", 0);
let mut runtime = current_thread::Runtime::new().unwrap();
let client = net.peer(0).client().clone();
let net = Arc::new(Mutex::new(net));
@@ -1195,7 +1088,7 @@ fn voter_persists_its_votes() {
name: Some(format!("peer#{}", 0)),
},
link: link,
network: MessageRouting::new(net.clone(), 0),
network: net.lock().peers[0].network_service().clone(),
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
@@ -1253,9 +1146,8 @@ fn voter_persists_its_votes() {
set_state
};
let routing = MessageRouting::new(net.clone(), 1);
let (network, routing_work) = communication::NetworkBridge::new(
routing,
net.lock().peers[1].network_service().clone(),
config.clone(),
set_state,
Exit,
@@ -1290,25 +1182,33 @@ fn voter_persists_its_votes() {
// we push 20 more blocks to alice's chain
net.lock().peer(0).push_blocks(20, false);
net.lock().sync();
assert_eq!(net.lock().peer(0).client().info().chain.best_number, 40,
"Peer #{} failed to sync", 0);
let net2 = net.clone();
let net = net.clone();
let voter_tx = voter_tx.clone();
let round_tx = round_tx.clone();
future::Either::A(tokio_timer::Interval::new_interval(Duration::from_millis(200))
.take_while(move |_| {
Ok(net2.lock().peer(1).client().info().chain.best_number != 40)
})
.for_each(|_| Ok(()))
.and_then(move |_| {
#[allow(deprecated)]
let block_30_hash =
net.lock().peer(0).client().as_full().unwrap().backend().blockchain().hash(30).unwrap().unwrap();
#[allow(deprecated)]
let block_30_hash =
net.lock().peer(0).client().as_full().unwrap().backend().blockchain().hash(30).unwrap().unwrap();
// we restart alice's voter
voter_tx.unbounded_send(()).unwrap();
// we restart alice's voter
voter_tx.unbounded_send(()).unwrap();
// and we push our own prevote for block 30
let prevote = grandpa::Prevote {
target_number: 30,
target_hash: block_30_hash,
};
// and we push our own prevote for block 30
let prevote = grandpa::Prevote {
target_number: 30,
target_hash: block_30_hash,
};
round_tx.lock().start_send(grandpa::Message::Prevote(prevote)).unwrap();
round_tx.lock().start_send(grandpa::Message::Prevote(prevote)).unwrap();
Ok(())
}).map_err(|_| panic!()))
} else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 {
// the next message we receive should be our own prevote
@@ -1324,6 +1224,8 @@ fn voter_persists_its_votes() {
// therefore we won't ever receive it again since it will be a
// known message on the gossip layer
future::Either::B(future::ok(()))
} else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 {
// we then receive a precommit from alice for block 15
// even though we casted a prevote for block 30
@@ -1336,23 +1238,17 @@ fn voter_persists_its_votes() {
// signal exit
exit_tx.clone().lock().take().unwrap().send(()).unwrap();
future::Either::B(future::ok(()))
} else {
panic!()
}
Ok(())
}).map_err(|_| ()));
}
let net = net.clone();
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| {
net.lock().send_import_notifications();
net.lock().send_finality_notifications();
net.lock().sync_without_disconnects();
Ok(())
})
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let exit = exit_rx.into_future().map(|_| ()).map_err(|_| ());
runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap();
@@ -1361,12 +1257,13 @@ fn voter_persists_its_votes() {
#[test]
fn finalize_3_voters_1_light_observer() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let authorities = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob, AuthorityKeyring::Charlie];
let voters = make_ids(authorities);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 4);
net.peer(0).push_blocks(20, false);
net.sync();
net.block_until_sync(&mut runtime);
for i in 0..4 {
assert_eq!(net.peer(i).client().info().chain.best_number, 20,
@@ -1380,7 +1277,7 @@ fn finalize_3_voters_1_light_observer() {
.take_while(|n| Ok(n.header.number() < &20))
.collect();
run_to_completion_with(20, net.clone(), authorities, |executor| {
run_to_completion_with(&mut runtime, 20, net.clone(), authorities, |executor| {
executor.spawn(
run_grandpa_observer(
Config {
@@ -1390,7 +1287,7 @@ fn finalize_3_voters_1_light_observer() {
name: Some("observer".to_string()),
},
link,
MessageRouting::new(net.clone(), 3),
net.lock().peers[3].network_service().clone(),
Exit,
).unwrap()
).unwrap();
@@ -1402,6 +1299,7 @@ fn finalize_3_voters_1_light_observer() {
#[test]
fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let peers = &[AuthorityKeyring::Alice];
let mut net = GrandpaTestNet::new(TestApi::new(make_ids(peers)), 1);
@@ -1411,14 +1309,18 @@ fn finality_proof_is_fetched_by_light_client_when_consensus_data_changes() {
// && instead fetches finality proof for block #1
net.peer(0).push_authorities_change_block(vec![substrate_primitives::sr25519::Public::from_raw([42; 32])]);
let net = Arc::new(Mutex::new(net));
run_to_completion(1, net.clone(), peers);
net.lock().sync_without_disconnects();
run_to_completion(&mut runtime, 1, net.clone(), peers);
net.lock().block_until_sync(&mut runtime);
// check that the block#1 is finalized on light client
while net.lock().peer(1).client().info().chain.finalized_number != 1 {
net.lock().tick_peer(1);
net.lock().sync_without_disconnects();
}
runtime.block_on(futures::future::poll_fn(move || -> std::result::Result<_, ()> {
if net.lock().peer(1).client().info().chain.finalized_number == 1 {
Ok(Async::Ready(()))
} else {
net.lock().poll();
Ok(Async::NotReady)
}
})).unwrap()
}
#[test]
@@ -1427,6 +1329,7 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
const FORCE_CHANGE: bool = true;
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
// two of these guys are offline.
let genesis_authorities = if FORCE_CHANGE {
@@ -1452,40 +1355,36 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
let net = GrandpaTestNet::new(api, 3);
let net = Arc::new(Mutex::new(net));
let runner_net = net.clone();
let add_blocks = move |_| {
net.lock().peer(0).push_blocks(1, false); // best is #1
net.lock().peer(0).push_blocks(1, false); // best is #1
// add a forced transition at block 5.
if FORCE_CHANGE {
let parent_hash = net.lock().peer(0).client().info().chain.best_hash;
forced_transitions.lock().insert(parent_hash, (0, ScheduledChange {
next_authorities: voters.clone(),
delay: 3,
}));
}
// add a forced transition at block 5.
if FORCE_CHANGE {
let parent_hash = net.lock().peer(0).client().info().chain.best_hash;
forced_transitions.lock().insert(parent_hash, (0, ScheduledChange {
next_authorities: voters.clone(),
delay: 3,
}));
}
// ensure block#10 enacts authorities set change => justification is generated
// normally it will reach light client, but because of the forced change, it will not
net.lock().peer(0).push_blocks(8, false); // best is #9
net.lock().peer(0).push_authorities_change_block(
vec![substrate_primitives::sr25519::Public::from_raw([42; 32])]
); // #10
net.lock().peer(0).push_blocks(1, false); // best is #11
net.lock().sync_without_disconnects();
None
};
// ensure block#10 enacts authorities set change => justification is generated
// normally it will reach light client, but because of the forced change, it will not
net.lock().peer(0).push_blocks(8, false); // best is #9
net.lock().peer(0).push_authorities_change_block(
vec![substrate_primitives::sr25519::Public::from_raw([42; 32])]
); // #10
net.lock().peer(0).push_blocks(1, false); // best is #11
net.lock().block_until_sync(&mut runtime);
// finalize block #11 on full clients
run_to_completion_with(11, runner_net.clone(), peers_a, add_blocks);
run_to_completion(&mut runtime, 11, net.clone(), peers_a);
// request finalization by light client
runner_net.lock().add_light_peer(&GrandpaTestNet::default_config());
runner_net.lock().sync_without_disconnects();
net.lock().add_light_peer(&GrandpaTestNet::default_config());
net.lock().block_until_sync(&mut runtime);
// check block, finalized on light client
assert_eq!(
runner_net.lock().peer(3).client().info().chain.finalized_number,
net.lock().peer(3).client().info().chain.finalized_number,
if FORCE_CHANGE { 0 } else { 10 },
);
}
@@ -1493,20 +1392,19 @@ fn empty_finality_proof_is_returned_to_light_client_when_authority_set_is_differ
#[test]
fn voter_catches_up_to_latest_round_when_behind() {
let _ = env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob];
let voters = make_ids(peers);
let mut net = GrandpaTestNet::new(TestApi::new(voters), 3);
net.peer(0).push_blocks(50, false);
net.sync();
net.block_until_sync(&mut runtime);
let net = Arc::new(Mutex::new(net));
let mut finality_notifications = Vec::new();
let mut runtime = current_thread::Runtime::new().unwrap();
let voter = |local_key, peer_id, link, net| -> Box<dyn Future<Item=(), Error=()> + Send> {
let voter = |local_key, peer_id, link, net: Arc<Mutex<GrandpaTestNet>>| -> Box<dyn Future<Item=(), Error=()> + Send> {
let grandpa_params = GrandpaParams {
config: Config {
gossip_duration: TEST_GOSSIP_DURATION,
@@ -1515,7 +1413,7 @@ fn voter_catches_up_to_latest_round_when_behind() {
name: Some(format!("peer#{}", peer_id)),
},
link: link,
network: MessageRouting::new(net, peer_id),
network: net.lock().peer(peer_id).network_service().clone(),
inherent_data_providers: InherentDataProviders::new(),
on_exit: Exit,
telemetry_on_connect: None,
@@ -1590,10 +1488,6 @@ fn voter_catches_up_to_latest_round_when_behind() {
})
};
let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL)
.for_each(move |_| { net.lock().sync_without_disconnects(); Ok(()) })
.map(|_| ())
.map_err(|_| ());
let drive_to_completion = futures::future::poll_fn(|| { net.lock().poll(); Ok(Async::NotReady) });
let _ = runtime.block_on(test.select(drive_to_completion).map_err(|_| ())).unwrap();
}
+40 -12
View File
@@ -121,16 +121,8 @@ pub struct NetworkConfiguration {
pub client_version: String,
/// Name of the node. Sent over the wire for debugging purposes.
pub node_name: String,
/// If true, the network will use mDNS to discover other libp2p nodes on the local network
/// and connect to them if they support the same chain.
pub enable_mdns: bool,
/// Optional external implementation of a libp2p transport. Used in WASM contexts where we need
/// some binding between the networking provided by the operating system or environment and
/// libp2p.
///
/// This parameter exists whatever the target platform is, but it is expected to be set to
/// `Some` only when compiling for WASM.
pub wasm_external_transport: Option<wasm_ext::ExtTransport>,
/// Configuration for the transport layer.
pub transport: TransportConfig,
}
impl Default for NetworkConfiguration {
@@ -148,8 +140,10 @@ impl Default for NetworkConfiguration {
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "unknown".into(),
node_name: "unknown".into(),
enable_mdns: false,
wasm_external_transport: None,
transport: TransportConfig::Normal {
enable_mdns: false,
wasm_external_transport: None,
},
}
}
}
@@ -170,6 +164,40 @@ impl NetworkConfiguration {
];
config
}
/// Create new default configuration for localhost-only connection with random port (useful for testing)
pub fn new_memory() -> NetworkConfiguration {
let mut config = NetworkConfiguration::new();
config.listen_addresses = vec![
iter::once(Protocol::Ip4(Ipv4Addr::new(127, 0, 0, 1)))
.chain(iter::once(Protocol::Tcp(0)))
.collect()
];
config
}
}
/// Configuration for the transport layer.
#[derive(Clone)]
pub enum TransportConfig {
/// Normal transport mode.
Normal {
/// If true, the network will use mDNS to discover other libp2p nodes on the local network
/// and connect to them if they support the same chain.
enable_mdns: bool,
/// Optional external implementation of a libp2p transport. Used in WASM contexts where we
/// need some binding between the networking provided by the operating system or environment
/// and libp2p.
///
/// This parameter exists whatever the target platform is, but it is expected to be set to
/// `Some` only when compiling for WASM.
wasm_external_transport: Option<wasm_ext::ExtTransport>,
},
/// Only allow connections within the same process.
/// Only addresses of the form `/memory/...` will be supported.
MemoryOnly,
}
/// The policy for connections to non-reserved peers.
+13 -1
View File
@@ -22,7 +22,7 @@ use libp2p::kad::{GetValueResult, Kademlia, KademliaOut, PutValueResult};
use libp2p::multihash::Multihash;
use libp2p::multiaddr::Protocol;
use log::{debug, info, trace, warn};
use std::{cmp, num::NonZeroU8, time::Duration};
use std::{cmp, collections::VecDeque, num::NonZeroU8, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
@@ -37,6 +37,8 @@ pub struct DiscoveryBehaviour<TSubstream> {
next_kad_random_query: Delay,
/// After `next_kad_random_query` triggers, the next one triggers after this duration.
duration_to_next_kad: Duration,
/// Discovered nodes to return.
discoveries: VecDeque<PeerId>,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
/// Identity of our local node.
@@ -59,6 +61,7 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
kademlia,
next_kad_random_query: Delay::new(clock.now()),
duration_to_next_kad: Duration::from_secs(1),
discoveries: VecDeque::new(),
clock,
local_peer_id: local_public_key.into_peer_id(),
}
@@ -72,8 +75,11 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
/// Adds a hard-coded address for the given peer, that never expires.
///
/// This adds an entry to the parameter that was passed to `new`.
///
/// If we didn't know this address before, also generates a `Discovered` event.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
if self.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
self.discoveries.push_back(peer_id.clone());
self.user_defined.push((peer_id, addr));
}
}
@@ -181,6 +187,12 @@ where
Self::OutEvent,
>,
> {
// Immediately process the content of `discovered`.
if let Some(peer_id) = self.discoveries.pop_front() {
let ev = DiscoveryOut::Discovered(peer_id);
return Async::Ready(NetworkBehaviourAction::GenerateEvent(ev));
}
// Poll the stream that fires when we need to start a random Kademlia query.
loop {
match self.next_kad_random_query.poll() {
+26 -7
View File
@@ -23,7 +23,7 @@ use std::time::Duration;
use log::{warn, error, info};
use libp2p::core::swarm::NetworkBehaviour;
use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
use libp2p::multihash::Multihash;
use libp2p::{Multiaddr, multihash::Multihash};
use futures::{prelude::*, sync::oneshot, sync::mpsc};
use parking_lot::{Mutex, RwLock};
use crate::protocol_behaviour::ProtocolBehaviour;
@@ -40,7 +40,7 @@ use crate::protocol::{event::Event, message::Message};
use crate::protocol::on_demand::RequestData;
use crate::protocol::{self, Context, CustomMessageOutcome, ConnectedPeer, PeerInfo};
use crate::protocol::sync::SyncState;
use crate::config::Params;
use crate::config::{Params, TransportConfig};
use crate::error::Error;
use crate::protocol::specialization::NetworkSpecialization;
@@ -197,12 +197,19 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
user_agent,
local_public,
known_addresses,
params.network_config.enable_mdns
);
let (transport, bandwidth) = transport::build_transport(
local_identity,
params.network_config.wasm_external_transport
match params.network_config.transport {
TransportConfig::MemoryOnly => false,
TransportConfig::Normal { enable_mdns, .. } => enable_mdns,
}
);
let (transport, bandwidth) = {
let (config_mem, config_wasm) = match params.network_config.transport {
TransportConfig::MemoryOnly => (true, None),
TransportConfig::Normal { wasm_external_transport, .. } =>
(false, wasm_external_transport)
};
transport::build_transport(local_identity, config_mem, config_wasm)
};
(Swarm::<B, S, H>::new(transport, behaviour, local_peer_id.clone()), bandwidth)
};
@@ -281,6 +288,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker
self.network_service.lock().user_protocol_mut().num_sync_peers()
}
/// Adds an address for a node.
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.network_service.lock().add_known_address(peer_id, addr);
}
/// Return a `NetworkService` that can be shared through the code base and can be used to
/// manipulate the worker.
pub fn service(&self) -> &Arc<NetworkService<B, S, H>> {
@@ -349,6 +361,13 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
let _ = self.network_chan.unbounded_send(NetworkMsg::DisconnectPeer(who));
}
/// Request a justification for the given block.
pub fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self
.protocol_sender
.unbounded_send(ProtocolMsg::RequestJustification(hash.clone(), number));
}
/// Execute a closure with the chain-specific network specialization.
pub fn with_spec<F>(&self, f: F)
where F: FnOnce(&mut S, &mut dyn Context<B>) + Send + 'static
@@ -16,16 +16,14 @@
//! Testing block import logic.
use consensus::import_queue::{import_single_block, BasicQueue, BlockImportError, BlockImportResult};
use consensus::import_queue::{
import_single_block, IncomingBlock, BasicQueue, BlockImportError, BlockImportResult
};
use test_client::{self, prelude::*};
use test_client::runtime::{Block, Hash};
use runtime_primitives::generic::BlockId;
use super::*;
struct TestLink {}
impl Link<Block> for TestLink {}
fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock<Block>) {
let client = test_client::new();
let block = client.new_block(Default::default()).unwrap().bake().unwrap();
@@ -77,7 +75,7 @@ fn async_import_queue_drops() {
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = Arc::new(PassThroughVerifier(true));
let mut queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None, None, None);
let queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None, None, None);
drop(queue);
}
}
File diff suppressed because it is too large Load Diff
+182 -140
View File
@@ -16,13 +16,14 @@
use client::{backend::Backend, blockchain::HeaderBackend};
use crate::config::Roles;
use crate::message;
use consensus::BlockOrigin;
use std::collections::HashSet;
use std::{time::Duration, time::Instant};
use tokio::runtime::current_thread;
use super::*;
fn test_ancestor_search_when_common_is(n: usize) {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(n, false);
@@ -33,7 +34,7 @@ fn test_ancestor_search_when_common_is(n: usize) {
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.sync();
net.block_until_sync(&mut runtime);
assert!(net.peer(0).client.as_in_memory_backend().blockchain()
.canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain()));
}
@@ -41,28 +42,24 @@ fn test_ancestor_search_when_common_is(n: usize) {
#[test]
fn sync_peers_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.sync();
for peer in 0..3 {
// Assert peers is up to date.
assert_eq!(net.peer(peer).num_peers(), 2);
// And then disconnect.
for other in 0..3 {
if other != peer {
net.peer(peer).on_disconnect(net.peer(other));
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
for peer in 0..3 {
if net.peer(peer).num_peers() != 2 {
return Ok(Async::NotReady)
}
}
}
net.sync();
// Now peers are disconnected.
for peer in 0..3 {
assert_eq!(net.peer(peer).num_peers(), 0);
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn sync_cycle_from_offline_to_syncing_to_offline() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
for peer in 0..3 {
// Offline, and not major syncing.
@@ -72,63 +69,92 @@ fn sync_cycle_from_offline_to_syncing_to_offline() {
// Generate blocks.
net.peer(2).push_blocks(100, false);
net.start();
for peer in 0..3 {
// Online
assert!(!net.peer(peer).is_offline());
if peer < 2 {
// Major syncing.
assert!(net.peer(peer).is_major_syncing());
}
}
net.sync();
for peer in 0..3 {
// All done syncing.
assert!(!net.peer(peer).is_major_syncing());
}
// Now disconnect them all.
for peer in 0..3 {
for other in 0..3 {
if other != peer {
net.peer(peer).on_disconnect(net.peer(other));
// Block until all nodes are online and nodes 0 and 1 and major syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
for peer in 0..3 {
// Online
if net.peer(peer).is_offline() {
return Ok(Async::NotReady)
}
if peer < 2 {
// Major syncing.
if !net.peer(peer).is_major_syncing() {
return Ok(Async::NotReady)
}
}
}
net.sync();
assert!(net.peer(peer).is_offline());
assert!(!net.peer(peer).is_major_syncing());
}
Ok(Async::Ready(()))
})).unwrap();
// Block until all nodes are done syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
for peer in 0..3 {
if net.peer(peer).is_major_syncing() {
return Ok(Async::NotReady)
}
}
Ok(Async::Ready(()))
})).unwrap();
// Now drop nodes 1 and 2, and check that node 0 is offline.
net.peers.remove(2);
net.peers.remove(1);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if !net.peer(0).is_offline() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
}
#[test]
fn syncing_node_not_major_syncing_when_disconnected() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
// Generate blocks.
net.peer(2).push_blocks(100, false);
net.start();
net.sync_step();
// Peer 1 is major-syncing.
assert!(net.peer(1).is_major_syncing());
// Disconnect peer 1 form everyone else.
net.peer(1).on_disconnect(net.peer(0));
net.peer(1).on_disconnect(net.peer(2));
// Peer 1 is not major-syncing.
net.sync();
// Check that we're not major syncing when disconnected.
assert!(!net.peer(1).is_major_syncing());
// Check that we switch to major syncing.
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if !net.peer(1).is_major_syncing() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
// Destroy two nodes, and check that we switch to non-major syncing.
net.peers.remove(2);
net.peers.remove(0);
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).is_major_syncing() {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
}
#[test]
fn sync_from_two_peers_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.sync();
net.block_until_sync(&mut runtime);
assert!(net.peer(0).client.as_in_memory_backend().blockchain()
.equals_to(net.peer(1).client.as_in_memory_backend().blockchain()));
assert!(!net.peer(0).is_major_syncing());
@@ -137,11 +163,12 @@ fn sync_from_two_peers_works() {
#[test]
fn sync_from_two_peers_with_ancestry_search_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(10, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.sync();
net.block_until_sync(&mut runtime);
assert!(net.peer(0).client.as_in_memory_backend().blockchain()
.canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain()));
}
@@ -149,13 +176,14 @@ fn sync_from_two_peers_with_ancestry_search_works() {
#[test]
fn ancestry_search_works_when_backoff_is_one() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(1, false);
net.peer(1).push_blocks(2, false);
net.peer(2).push_blocks(2, false);
net.sync();
net.block_until_sync(&mut runtime);
assert!(net.peer(0).client.as_in_memory_backend().blockchain()
.canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain()));
}
@@ -163,13 +191,14 @@ fn ancestry_search_works_when_backoff_is_one() {
#[test]
fn ancestry_search_works_when_ancestor_is_genesis() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(13, true);
net.peer(1).push_blocks(100, false);
net.peer(2).push_blocks(100, false);
net.sync();
net.block_until_sync(&mut runtime);
assert!(net.peer(0).client.as_in_memory_backend().blockchain()
.canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain()));
}
@@ -191,9 +220,11 @@ fn ancestry_search_works_when_common_is_hundred() {
#[test]
fn sync_long_chain_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.peer(1).push_blocks(500, false);
net.sync();
net.block_until_sync(&mut runtime);
assert!(net.peer(0).client.as_in_memory_backend().blockchain()
.equals_to(net.peer(1).client.as_in_memory_backend().blockchain()));
}
@@ -201,10 +232,11 @@ fn sync_long_chain_works() {
#[test]
fn sync_no_common_longer_chain_fails() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.peer(0).push_blocks(20, true);
net.peer(1).push_blocks(20, false);
net.sync();
net.block_until_sync(&mut runtime);
assert!(!net.peer(0).client.as_in_memory_backend().blockchain()
.canon_equals_to(net.peer(1).client.as_in_memory_backend().blockchain()));
}
@@ -212,9 +244,10 @@ fn sync_no_common_longer_chain_fails() {
#[test]
fn sync_justifications() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = JustificationTestNet::new(3);
net.peer(0).push_blocks(20, false);
net.sync();
net.block_until_sync(&mut runtime);
// there's currently no justification for block #10
assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None);
@@ -234,17 +267,26 @@ fn sync_justifications() {
net.peer(1).request_justification(&h2.hash().into(), 15);
net.peer(1).request_justification(&h3.hash().into(), 20);
net.sync();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
for height in (10..21).step_by(5) {
assert_eq!(net.peer(0).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new()));
assert_eq!(net.peer(1).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new()));
}
for height in (10..21).step_by(5) {
if net.peer(0).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) {
return Ok(Async::NotReady);
}
if net.peer(1).client().justification(&BlockId::Number(height)).unwrap() != Some(Vec::new()) {
return Ok(Async::NotReady);
}
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn sync_justifications_across_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = JustificationTestNet::new(3);
// we push 5 blocks
net.peer(0).push_blocks(5, false);
@@ -254,24 +296,31 @@ fn sync_justifications_across_forks() {
// peer 1 will only see the longer fork. but we'll request justifications
// for both and finalize the small fork instead.
net.sync();
net.block_until_sync(&mut runtime);
net.peer(0).client().finalize_block(BlockId::Hash(f1_best), Some(Vec::new()), true).unwrap();
net.peer(1).request_justification(&f1_best, 10);
net.peer(1).request_justification(&f2_best, 11);
net.sync();
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new()));
assert_eq!(net.peer(1).client().justification(&BlockId::Number(10)).unwrap(), Some(Vec::new()));
if net.peer(0).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new()) &&
net.peer(1).client().justification(&BlockId::Number(10)).unwrap() == Some(Vec::new())
{
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
})).unwrap();
}
#[test]
fn sync_after_fork_works() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.sync_step();
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
net.peer(2).push_blocks(30, false);
@@ -285,7 +334,7 @@ fn sync_after_fork_works() {
// peer 1 has the best chain
let peer1_chain = net.peer(1).client.as_in_memory_backend().blockchain().clone();
net.sync();
net.block_until_sync(&mut runtime);
assert!(net.peer(0).client.as_in_memory_backend().blockchain().canon_equals_to(&peer1_chain));
assert!(net.peer(1).client.as_in_memory_backend().blockchain().canon_equals_to(&peer1_chain));
assert!(net.peer(2).client.as_in_memory_backend().blockchain().canon_equals_to(&peer1_chain));
@@ -294,15 +343,15 @@ fn sync_after_fork_works() {
#[test]
fn syncs_all_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(4);
net.sync_step();
net.peer(0).push_blocks(2, false);
net.peer(1).push_blocks(2, false);
net.peer(0).push_blocks(2, true);
net.peer(1).push_blocks(4, false);
net.sync();
net.block_until_sync(&mut runtime);
// Check that all peers have all of the blocks.
assert_eq!(9, net.peer(0).client.as_in_memory_backend().blockchain().blocks_count());
assert_eq!(9, net.peer(1).client.as_in_memory_backend().blockchain().blocks_count());
@@ -311,13 +360,12 @@ fn syncs_all_forks() {
#[test]
fn own_blocks_are_announced() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(3);
net.sync(); // connect'em
net.block_until_sync(&mut runtime); // connect'em
net.peer(0).generate_blocks(1, BlockOrigin::Own, |builder| builder.bake().unwrap());
let header = net.peer(0).client().header(&BlockId::Number(1)).unwrap().unwrap();
net.peer(0).on_block_imported(header.hash(), &header);
net.sync();
net.block_until_sync(&mut runtime);
assert_eq!(net.peer(0).client.as_in_memory_backend().blockchain().info().best_number, 1);
assert_eq!(net.peer(1).client.as_in_memory_backend().blockchain().info().best_number, 1);
@@ -329,6 +377,7 @@ fn own_blocks_are_announced() {
#[test]
fn blocks_are_not_announced_by_light_nodes() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(0);
// full peer0 is connected to light peer
@@ -336,35 +385,32 @@ fn blocks_are_not_announced_by_light_nodes() {
let mut light_config = ProtocolConfig::default();
light_config.roles = Roles::LIGHT;
net.add_full_peer(&ProtocolConfig::default());
net.add_full_peer(&light_config);
net.add_full_peer(&ProtocolConfig::default());
net.add_light_peer(&light_config);
// Sync between 0 and 1.
net.peer(0).push_blocks(1, false);
net.peer(0).start();
net.peer(1).start();
net.peer(2).start();
net.peer(0).on_connect(net.peer(1));
net.peer(1).on_connect(net.peer(2));
// Only sync between 0 -> 1, and 1 -> 2
let mut disconnected = HashSet::new();
disconnected.insert(0);
disconnected.insert(2);
net.sync_with(true, Some(disconnected));
// peer 0 has the best chain
// peer 1 has the best chain
// peer 2 has genesis-chain only
assert_eq!(net.peer(0).client.info().chain.best_number, 1);
net.block_until_sync(&mut runtime);
assert_eq!(net.peer(1).client.info().chain.best_number, 1);
assert_eq!(net.peer(2).client.info().chain.best_number, 0);
// Add another node and remove node 0.
net.add_full_peer(&ProtocolConfig::default());
net.peers.remove(0);
// Poll for a few seconds and make sure 1 and 2 (now 0 and 1) don't sync together.
let mut delay = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5));
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
delay.poll().map_err(|_| ())
})).unwrap();
assert_eq!(net.peer(1).client.info().chain.best_number, 0);
}
#[test]
fn can_sync_small_non_best_forks() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
let mut net = TestNet::new(2);
net.sync_step();
net.peer(0).push_blocks(30, false);
net.peer(1).push_blocks(30, false);
@@ -381,7 +427,15 @@ fn can_sync_small_non_best_forks() {
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none());
net.sync();
// poll until the two nodes connect, otherwise announcing the block will not work
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
if net.peer(0).num_peers() == 0 {
Ok(Async::NotReady)
} else {
Ok(Async::Ready(()))
}
})).unwrap();
// synchronization: 0 synced to longer chain and 1 didn't sync to small chain.
@@ -391,17 +445,24 @@ fn can_sync_small_non_best_forks() {
assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
net.peer(0).announce_block(small_hash);
net.sync();
// after announcing, peer 1 downloads the block.
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
assert!(net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some());
if net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_none() {
return Ok(Async::NotReady)
}
Ok(Async::Ready(()))
})).unwrap();
}
#[test]
fn can_not_sync_from_light_peer() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
// given the network with 1 full nodes (#0) and 1 light node (#1)
let mut net = TestNet::new(1);
@@ -411,8 +472,7 @@ fn can_not_sync_from_light_peer() {
net.peer(0).push_blocks(1, false);
// and let the light client sync from this node
// (mind the #1 is disconnected && not syncing)
net.sync();
net.block_until_sync(&mut runtime);
// ensure #0 && #1 have the same best block
let full0_info = net.peer(0).client.info().chain;
@@ -421,52 +481,34 @@ fn can_not_sync_from_light_peer() {
assert_eq!(light_info.best_number, 1);
assert_eq!(light_info.best_hash, full0_info.best_hash);
// add new full client (#2) && sync without #0
// add new full client (#2) && remove #0
net.add_full_peer(&Default::default());
net.peer(1).on_connect(net.peer(2));
net.peer(2).on_connect(net.peer(1));
net.peer(1).announce_block(light_info.best_hash);
net.sync_with(true, Some(vec![0].into_iter().collect()));
net.peers.remove(0);
// ensure that the #2 has failed to sync block #1
assert_eq!(net.peer(2).client.info().chain.best_number, 0);
// and that the #1 is still connected to #2
// (because #2 has not tried to fetch block data from the #1 light node)
assert_eq!(net.peer(1).num_peers(), 2);
// and now try to fetch block data from light peer #1
// (this should result in disconnect)
net.peer(1).receive_message(
&net.peer(2).peer_id,
message::generic::Message::BlockRequest(message::generic::BlockRequest {
id: 0,
fields: message::BlockAttributes::HEADER,
from: message::FromBlock::Hash(light_info.best_hash),
to: None,
direction: message::Direction::Ascending,
max: Some(1),
}),
);
net.sync();
// check that light #1 has disconnected from #2
assert_eq!(net.peer(1).num_peers(), 1);
// ensure that the #2 (now #1) fails to sync block #1 even after 5 seconds
let mut test_finished = tokio_timer::Delay::new(Instant::now() + Duration::from_secs(5));
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| -> Result<_, ()> {
net.poll();
test_finished.poll().map_err(|_| ())
})).unwrap();
}
#[test]
fn light_peer_imports_header_from_announce() {
let _ = ::env_logger::try_init();
let mut runtime = current_thread::Runtime::new().unwrap();
fn import_with_announce(net: &mut TestNet, hash: H256) {
let header = net.peer(0).client().header(&BlockId::Hash(hash)).unwrap().unwrap();
net.peer(1).receive_message(
&net.peer(0).peer_id,
message::generic::Message::BlockAnnounce(message::generic::BlockAnnounce {
header,
}),
);
fn import_with_announce(net: &mut TestNet, runtime: &mut current_thread::Runtime, hash: H256) {
net.peer(0).announce_block(hash);
net.peer(1).import_queue_sync();
assert!(net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some());
runtime.block_on(futures::future::poll_fn::<(), (), _>(|| {
net.poll();
if net.peer(1).client().header(&BlockId::Hash(hash)).unwrap().is_some() {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
})).unwrap();
}
// given the network with 1 full nodes (#0) and 1 light node (#1)
@@ -474,13 +516,13 @@ fn light_peer_imports_header_from_announce() {
net.add_light_peer(&Default::default());
// let them connect to each other
net.sync();
net.block_until_sync(&mut runtime);
// check that NEW block is imported from announce message
let new_hash = net.peer(0).push_blocks(1, false);
import_with_announce(&mut net, new_hash);
import_with_announce(&mut net, &mut runtime, new_hash);
// check that KNOWN STALE block is imported from announce message
let known_stale_hash = net.peer(0).push_blocks_at(BlockId::Number(0), 1, true);
import_with_announce(&mut net, known_stale_hash);
import_with_announce(&mut net, &mut runtime, known_stale_hash);
}
+16 -3
View File
@@ -30,10 +30,14 @@ pub use self::bandwidth::BandwidthSinks;
/// Builds the transport that serves as a common ground for all connections.
///
/// If `memory_only` is true, then only communication within the same process are allowed. Only
/// addresses with the format `/memory/...` are allowed.
///
/// Returns a `BandwidthSinks` object that allows querying the average bandwidth produced by all
/// the connections spawned with this transport.
pub fn build_transport(
keypair: identity::Keypair,
memory_only: bool,
wasm_external_transport: Option<wasm_ext::ExtTransport>
) -> (Boxed<(PeerId, StreamMuxerBox), io::Error>, Arc<bandwidth::BandwidthSinks>) {
// Build configuration objects for encryption mechanisms.
@@ -63,12 +67,21 @@ pub fn build_transport(
OptionalTransport::none()
};
#[cfg(not(target_os = "unknown"))]
let transport = {
let transport = transport.or_transport(if !memory_only {
let desktop_trans = tcp::TcpConfig::new();
let desktop_trans = websocket::WsConfig::new(desktop_trans.clone())
.or_transport(desktop_trans);
transport.or_transport(dns::DnsConfig::new(desktop_trans))
};
OptionalTransport::some(dns::DnsConfig::new(desktop_trans))
} else {
OptionalTransport::none()
});
let transport = transport.or_transport(if memory_only {
OptionalTransport::some(libp2p::core::transport::MemoryTransport::default())
} else {
OptionalTransport::none()
});
let (transport, sinks) = bandwidth::BandwidthLogging::new(transport, Duration::from_secs(5));
// Encryption
+5 -3
View File
@@ -35,7 +35,7 @@ use service::{
FactoryExtrinsic,
};
use network::{multiaddr, Multiaddr, ManageNetwork};
use network::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode};
use network::config::{NetworkConfiguration, TransportConfig, NodeKeyConfig, Secret, NonReservedPeerMode};
use sr_primitives::generic::BlockId;
use consensus::{ImportBlock, BlockImport};
@@ -160,8 +160,10 @@ fn node_config<F: ServiceFactory> (
non_reserved_mode: NonReservedPeerMode::Accept,
client_version: "network/test/0.1".to_owned(),
node_name: "unknown".to_owned(),
enable_mdns: false,
wasm_external_transport: None,
transport: TransportConfig::Normal {
enable_mdns: false,
wasm_external_transport: None,
},
};
Configuration {