From 4a44a07fd0ac93c6c27eb4e0ba3a35eae4e9580d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 18 Jun 2019 10:36:42 +0200 Subject: [PATCH] Move libp2p tests to custom_proto (#2884) --- .../core/network/src/custom_proto/handler.rs | 2 +- .../core/network/src/custom_proto/mod.rs | 1 + .../src/{service => custom_proto}/tests.rs | 293 ++++++++++-------- .../core/network/src/custom_proto/upgrade.rs | 13 - substrate/core/network/src/service.rs | 2 - 5 files changed, 161 insertions(+), 150 deletions(-) rename substrate/core/network/src/{service => custom_proto}/tests.rs (55%) diff --git a/substrate/core/network/src/custom_proto/handler.rs b/substrate/core/network/src/custom_proto/handler.rs index 0400c27f82..0ec60e79cd 100644 --- a/substrate/core/network/src/custom_proto/handler.rs +++ b/substrate/core/network/src/custom_proto/handler.rs @@ -585,7 +585,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { ProtocolState::Init { .. } | ProtocolState::Opening { .. } | ProtocolState::Normal { .. } => KeepAlive::Yes, ProtocolState::Disabled { .. } | ProtocolState::Poisoned | - ProtocolState::KillAsap => KeepAlive::No, + ProtocolState::KillAsap => KeepAlive::No, } } diff --git a/substrate/core/network/src/custom_proto/mod.rs b/substrate/core/network/src/custom_proto/mod.rs index a4fdebbb31..22c66c1654 100644 --- a/substrate/core/network/src/custom_proto/mod.rs +++ b/substrate/core/network/src/custom_proto/mod.rs @@ -20,3 +20,4 @@ pub use self::upgrade::CustomMessage; mod behaviour; mod handler; mod upgrade; +mod tests; diff --git a/substrate/core/network/src/service/tests.rs b/substrate/core/network/src/custom_proto/tests.rs similarity index 55% rename from substrate/core/network/src/service/tests.rs rename to substrate/core/network/src/custom_proto/tests.rs index f18f471e7a..3eb9eb77ac 100644 --- a/substrate/core/network/src/service/tests.rs +++ b/substrate/core/network/src/custom_proto/tests.rs @@ -16,79 +16,176 @@ #![cfg(test)] -use futures::{future, stream, prelude::*, try_ready}; -use libp2p::core::swarm::ExpandedSwarm; +use futures::{future, prelude::*, try_ready}; +use libp2p::core::{nodes::Substream, swarm::Swarm}; +use libp2p::core::{transport::boxed::Boxed, muxing::StreamMuxerBox}; +use libp2p::core::{ProtocolsHandler, protocols_handler::IntoProtocolsHandler}; +use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; +use libp2p::core::swarm::PollParameters; +use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; -use runtime_primitives::traits::Block as BlockT; use std::{io, time::Duration, time::Instant}; use test_client::runtime::Block; -use crate::protocol::message::generic::Message; -use crate::{Multiaddr, multiaddr::Protocol, build_multiaddr}; -use crate::custom_proto::CustomProtoOut; -use super::{start_service, Swarm}; +use crate::protocol::message::{Message as MessageAlias, generic::Message}; +use crate::custom_proto::{CustomProto, CustomProtoOut, CustomMessage}; -/// Builds two services. The second one and further have the first one as its bootstrap node. +/// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. -fn build_nodes(num: usize, base_port: u16) -> Vec> { - let mut result: Vec> = Vec::with_capacity(num); - let mut first_addr = None::; +fn build_nodes() +-> ( + Swarm, CustomProtoWithAddr>, + Swarm, CustomProtoWithAddr> +) { + let mut out = Vec::with_capacity(2); - for index in 0 .. num { - let mut boot_nodes = Vec::new(); + let keypairs: Vec<_> = (0..2).map(|_| libp2p::identity::Keypair::generate_ed25519()).collect(); + let addrs: Vec = (0..2) + .map(|_| format!("/memory/{}", rand::random::()).parse().unwrap()) + .collect(); - if let Some(first_addr) = first_addr.as_ref() { - boot_nodes.push(first_addr.clone() - .with(Protocol::P2p(ExpandedSwarm::local_peer_id(&result[0]).clone().into())) - .to_string()); - } + for index in 0 .. 2 { + let transport = libp2p::core::transport::MemoryTransport + .with_upgrade(libp2p::secio::SecioConfig::new(keypairs[index].clone())) + .and_then(move |out, endpoint| { + let peer_id = out.remote_key.into_peer_id(); + libp2p::core::upgrade::apply(out.stream, libp2p::yamux::Config::default(), endpoint) + .map(|muxer| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer))) + }) + .with_timeout(Duration::from_secs(20)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) + .boxed(); - let config = crate::config::NetworkConfiguration { - listen_addresses: vec![build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(base_port + index as u16)]], - boot_nodes, - ..crate::config::NetworkConfiguration::default() + let (peerset, _) = peerset::Peerset::from_config(peerset::PeersetConfig { + in_peers: 25, + out_peers: 25, + bootnodes: keypairs + .iter() + .enumerate() + .filter_map(|(n, p)| if n != index { Some(p.public().into_peer_id()) } else { None }) + .collect(), + reserved_only: false, + reserved_nodes: Vec::new(), + }); + + let behaviour = CustomProtoWithAddr { + inner: CustomProto::new(&b"test"[..], &[1], peerset), + addrs: addrs + .iter() + .enumerate() + .filter_map(|(n, a)| if n != index { + Some((keypairs[n].public().into_peer_id(), a.clone())) + } else { + None + }) + .collect(), }; - if first_addr.is_none() { - first_addr = Some(config.listen_addresses.iter().next().unwrap().clone()); - } - - result.push(start_service::(config, &b"tst"[..], &[1]).unwrap().0); + let mut swarm = libp2p::core::swarm::Swarm::new( + transport, + behaviour, + keypairs[index].public().into_peer_id() + ); + Swarm::listen_on(&mut swarm, addrs[index].clone()).unwrap(); + out.push(swarm); } - result + // Final output + let mut out_iter = out.into_iter(); + let first = out_iter.next().unwrap(); + let second = out_iter.next().unwrap(); + (first, second) } -#[test] -fn basic_two_nodes_connectivity() { - let (mut service1, mut service2) = { - let mut l = build_nodes::(2, 50400).into_iter(); - let a = l.next().unwrap(); - let b = l.next().unwrap(); - (a, b) - }; +/// Wraps around the `CustomBehaviour` network behaviour, and adds hardcoded node addresses to it. +struct CustomProtoWithAddr { + inner: CustomProto>, + addrs: Vec<(PeerId, Multiaddr)>, +} - let fut1 = future::poll_fn(move || -> io::Result<_> { - match try_ready!(service1.poll()) { - Some(CustomProtoOut::CustomProtocolOpen { version, .. }) => { - assert_eq!(version, 1); - Ok(Async::Ready(())) - }, - _ => panic!(), +impl std::ops::Deref for CustomProtoWithAddr { + type Target = CustomProto>; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl std::ops::DerefMut for CustomProtoWithAddr { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl NetworkBehaviour for CustomProtoWithAddr { + type ProtocolsHandler = + > as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = > as NetworkBehaviour>::OutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.inner.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + let mut list = self.inner.addresses_of_peer(peer_id); + for (p, a) in self.addrs.iter() { + if p == peer_id { + list.push(a.clone()); + } } - }); + list + } - let fut2 = future::poll_fn(move || -> io::Result<_> { - match try_ready!(service2.poll()) { - Some(CustomProtoOut::CustomProtocolOpen { version, .. }) => { - assert_eq!(version, 1); - Ok(Async::Ready(())) - }, - _ => panic!(), - } - }); + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + self.inner.inject_connected(peer_id, endpoint) + } - let combined = fut1.select(fut2).map_err(|(err, _)| err); - let _ = tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap(); + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + self.inner.inject_disconnected(peer_id, endpoint) + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: <::Handler as ProtocolsHandler>::OutEvent + ) { + self.inner.inject_node_event(peer_id, event) + } + + fn poll( + &mut self, + params: &mut PollParameters + ) -> Async< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent + > + > { + self.inner.poll(params) + } + + fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + self.inner.inject_replaced(peer_id, closed_endpoint, new_endpoint) + } + + fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) { + self.inner.inject_addr_reach_failure(peer_id, addr, error) + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + self.inner.inject_dial_failure(peer_id) + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.inner.inject_new_listen_addr(addr) + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.inner.inject_expired_listen_addr(addr) + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + self.inner.inject_new_external_addr(addr) + } } #[test] @@ -100,19 +197,14 @@ fn two_nodes_transfer_lots_of_packets() { // substreams allowed by the multiplexer. const NUM_PACKETS: u32 = 5000; - let (mut service1, mut service2) = { - let mut l = build_nodes::(2, 50450).into_iter(); - let a = l.next().unwrap(); - let b = l.next().unwrap(); - (a, b) - }; + let (mut service1, mut service2) = build_nodes::>(); let fut1 = future::poll_fn(move || -> io::Result<_> { loop { match try_ready!(service1.poll()) { Some(CustomProtoOut::CustomProtocolOpen { peer_id, .. }) => { for n in 0 .. NUM_PACKETS { - service1.user_protocol_mut().send_packet( + service1.send_packet( &peer_id, Message::ChainSpecific(vec![(n % 256) as u8]) ); @@ -144,71 +236,9 @@ fn two_nodes_transfer_lots_of_packets() { let _ = tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap(); } -#[test] -fn many_nodes_connectivity() { - // Creates many nodes, then make sure that they are all connected to each other. - // Note: if you increase this number, keep in mind that there's a limit to the number of - // simultaneous connections which will make the test fail if it is reached. This can be - // increased in the `NetworkConfiguration`. - const NUM_NODES: usize = 25; - - let mut futures = build_nodes::(NUM_NODES, 50500) - .into_iter() - .map(move |mut node| { - let mut num_connecs = 0; - stream::poll_fn(move || -> io::Result<_> { - loop { - match try_ready!(node.poll()) { - Some(CustomProtoOut::CustomProtocolOpen { .. }) => { - num_connecs += 1; - assert!(num_connecs < NUM_NODES); - if num_connecs == NUM_NODES - 1 { - return Ok(Async::Ready(Some(true))) - } - } - Some(CustomProtoOut::CustomProtocolClosed { .. }) => { - let was_success = num_connecs == NUM_NODES - 1; - num_connecs -= 1; - if was_success && num_connecs < NUM_NODES - 1 { - return Ok(Async::Ready(Some(false))) - } - } - _ => panic!(), - } - } - }) - }) - .collect::>(); - - let mut successes = 0; - let combined = future::poll_fn(move || -> io::Result<_> { - for node in futures.iter_mut() { - match node.poll()? { - Async::Ready(Some(true)) => successes += 1, - Async::Ready(Some(false)) => successes -= 1, - Async::Ready(None) => unreachable!(), - Async::NotReady => () - } - } - - if successes == NUM_NODES { - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) - } - }); - - tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap(); -} - #[test] fn basic_two_nodes_requests_in_parallel() { - let (mut service1, mut service2) = { - let mut l = build_nodes::(2, 50550).into_iter(); - let a = l.next().unwrap(); - let b = l.next().unwrap(); - (a, b) - }; + let (mut service1, mut service2) = build_nodes::>(); // Generate random messages with or without a request id. let mut to_send = { @@ -230,7 +260,7 @@ fn basic_two_nodes_requests_in_parallel() { match try_ready!(service1.poll()) { Some(CustomProtoOut::CustomProtocolOpen { peer_id, .. }) => { for msg in to_send.drain(..) { - service1.user_protocol_mut().send_packet(&peer_id, msg); + service1.send_packet(&peer_id, msg); } }, _ => panic!(), @@ -263,12 +293,7 @@ fn reconnect_after_disconnect() { // We connect two nodes together, then force a disconnect (through the API of the `Service`), // check that the disconnect worked, and finally check whether they successfully reconnect. - let (mut service1, mut service2) = { - let mut l = build_nodes::(2, 50350).into_iter(); - let a = l.next().unwrap(); - let b = l.next().unwrap(); - (a, b) - }; + let (mut service1, mut service2) = build_nodes::>(); // We use the `current_thread` runtime because it doesn't require us to have `'static` futures. let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); @@ -290,7 +315,7 @@ fn reconnect_after_disconnect() { ServiceState::NotConnected => { service1_state = ServiceState::FirstConnec; if service2_state == ServiceState::FirstConnec { - service1.user_protocol_mut().disconnect_peer(ExpandedSwarm::local_peer_id(&service2)); + service1.disconnect_peer(Swarm::local_peer_id(&service2)); } }, ServiceState::Disconnected => service1_state = ServiceState::ConnectedAgain, @@ -314,7 +339,7 @@ fn reconnect_after_disconnect() { ServiceState::NotConnected => { service2_state = ServiceState::FirstConnec; if service1_state == ServiceState::FirstConnec { - service1.user_protocol_mut().disconnect_peer(ExpandedSwarm::local_peer_id(&service2)); + service1.disconnect_peer(Swarm::local_peer_id(&service2)); } }, ServiceState::Disconnected => service2_state = ServiceState::ConnectedAgain, diff --git a/substrate/core/network/src/custom_proto/upgrade.rs b/substrate/core/network/src/custom_proto/upgrade.rs index 0df280ad1a..9ede475349 100644 --- a/substrate/core/network/src/custom_proto/upgrade.rs +++ b/substrate/core/network/src/custom_proto/upgrade.rs @@ -141,19 +141,6 @@ pub trait CustomMessage { where Self: Sized; } -// This trait implementation exist mostly for testing convenience. This should eventually be -// removed. - -impl CustomMessage for Vec { - fn into_bytes(self) -> Vec { - self - } - - fn from_bytes(bytes: &[u8]) -> Result { - Ok(bytes.to_vec()) - } -} - /// Event produced by the `RegisteredProtocolSubstream`. #[derive(Debug, Clone)] pub enum RegisteredProtocolEvent { diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 59618aa5b4..a825becbe5 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -44,8 +44,6 @@ use crate::config::Params; use crate::error::Error; use crate::protocol::specialization::NetworkSpecialization; -mod tests; - /// Interval at which we send status updates on the status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); /// Interval at which we update the `peers` field on the main thread.