diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network-libp2p/tests/test.rs index 8120e15c3a..5ff4255965 100644 --- a/substrate/core/network-libp2p/tests/test.rs +++ b/substrate/core/network-libp2p/tests/test.rs @@ -14,41 +14,44 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures::{future, prelude::*, try_ready}; +use futures::{future, stream, prelude::*, try_ready}; use std::{io, iter}; use substrate_network_libp2p::{ServiceEvent, multiaddr}; -/// Builds two services. The second one has the first one as its bootstrap node. +/// Builds two services. The second one and further have the first one as its bootstrap node. /// This is to be used only for testing, and a panic will happen if something goes wrong. -fn build_two_nodes() -> (substrate_network_libp2p::Service, substrate_network_libp2p::Service) { - let service1 = { - let config = substrate_network_libp2p::NetworkConfiguration { - listen_addresses: vec![multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]], - ..substrate_network_libp2p::NetworkConfiguration ::default() - }; - let proto = substrate_network_libp2p::RegisteredProtocol::new(*b"tst", &[1]); - substrate_network_libp2p::start_service(config, iter::once(proto)).unwrap() - }; +fn build_nodes(num: usize) -> Vec { + let mut result: Vec = Vec::with_capacity(num); - let service2 = { - let mut bootnode = service1.listeners().next().unwrap().clone(); - bootnode.append(libp2p::multiaddr::Protocol::P2p(service1.peer_id().clone().into())); + for _ in 0 .. num { + let mut boot_nodes = Vec::new(); + if !result.is_empty() { + let mut bootnode = result[0].listeners().next().unwrap().clone(); + bootnode.append(libp2p::multiaddr::Protocol::P2p(result[0].peer_id().clone().into())); + boot_nodes.push(bootnode.to_string()); + } let config = substrate_network_libp2p::NetworkConfiguration { listen_addresses: vec![multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]], - boot_nodes: vec![bootnode.to_string()], + boot_nodes, ..substrate_network_libp2p::NetworkConfiguration::default() }; - let proto = substrate_network_libp2p::RegisteredProtocol::new(*b"tst", &[1]); - substrate_network_libp2p::start_service(config, iter::once(proto)).unwrap() - }; - (service1, service2) + let proto = substrate_network_libp2p::RegisteredProtocol::new(*b"tst", &[1]); + result.push(substrate_network_libp2p::start_service(config, iter::once(proto)).unwrap()); + } + + result } #[test] fn basic_two_nodes_connectivity() { - let (mut service1, mut service2) = build_two_nodes(); + let (mut service1, mut service2) = { + let mut l = build_nodes(2).into_iter(); + let a = l.next().unwrap(); + let b = l.next().unwrap(); + (a, b) + }; let fut1 = future::poll_fn(move || -> io::Result<_> { match try_ready!(service1.poll()) { @@ -75,3 +78,103 @@ fn basic_two_nodes_connectivity() { let combined = fut1.select(fut2).map_err(|(err, _)| err); tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap(); } + +#[test] +fn two_nodes_transfer_lots_of_packets() { + // We spawn two nodes, then make the first one send lots of packets to the second one. The test + // ends when the second one has received all of them. + const NUM_PACKETS: u32 = 20000; + + let (mut service1, mut service2) = { + let mut l = build_nodes(2).into_iter(); + let a = l.next().unwrap(); + let b = l.next().unwrap(); + (a, b) + }; + + let fut1 = future::poll_fn(move || -> io::Result<_> { + loop { + match try_ready!(service1.poll()) { + Some(ServiceEvent::OpenedCustomProtocol { node_index, protocol, .. }) => { + for n in 0 .. NUM_PACKETS { + service1.send_custom_message(node_index, protocol, vec![(n % 256) as u8]); + } + }, + _ => panic!(), + } + } + }); + + let mut packet_counter = 0u32; + let fut2 = future::poll_fn(move || -> io::Result<_> { + loop { + match try_ready!(service2.poll()) { + Some(ServiceEvent::OpenedCustomProtocol { .. }) => {}, + Some(ServiceEvent::CustomMessage { data, .. }) => { + assert_eq!(data.len(), 1); + assert_eq!(u32::from(data[0]), packet_counter % 256); + packet_counter += 1; + if packet_counter == NUM_PACKETS { + return Ok(Async::Ready(())) + } + } + _ => panic!(), + } + } + }); + + let combined = fut1.select(fut2).map_err(|(err, _)| err); + tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap(); +} + +#[test] +fn many_nodes_connectivity() { + // Creates many nodes, then make sure that they are all connected to each other. + // Note: if you increase this number, keep in mind that there's a limit to the number of + // simultaneous connections which will make the test fail if it is reached. This can be + // increased in the `NetworkConfiguration`. + const NUM_NODES: usize = 25; + + let mut futures = build_nodes(NUM_NODES) + .into_iter() + .map(move |mut node| { + let mut num_connecs = 0; + stream::poll_fn(move || -> io::Result<_> { + loop { + match try_ready!(node.poll()) { + Some(ServiceEvent::OpenedCustomProtocol { .. }) => { + num_connecs += 1; + if num_connecs == NUM_NODES - 1 { + return Ok(Async::Ready(Some(()))) + } + } + // TODO: we sometimes receive a closed connection event; maybe this is + // benign, but it would be nice to figure out why + // (https://github.com/libp2p/rust-libp2p/issues/844) + Some(ServiceEvent::ClosedCustomProtocol { .. }) => {} + _ => panic!(), + } + } + }) + }) + .collect::>(); + + let mut successes = 0; + let combined = future::poll_fn(move || -> io::Result<_> { + for node in futures.iter_mut() { + match node.poll()? { + Async::Ready(Some(_)) => successes += 1, + Async::Ready(None) => unreachable!(), + Async::NotReady => () + } + } + + if successes == NUM_NODES { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } + }); + + tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap(); +}