Update to libp2p v0.7.0 (#2343)

* Update to libp2p master

* Fix tests

* More tests fixing
This commit is contained in:
Pierre Krieger
2019-04-23 19:46:30 +02:00
committed by Gavin Wood
parent e2bb429711
commit 3f06fe32f3
13 changed files with 1437 additions and 446 deletions
+1 -1
View File
@@ -13,7 +13,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { version = "0.6.0", default-features = false, features = ["secio-secp256k1", "libp2p-websocket"] }
libp2p = { version = "0.7.0", default-features = false, features = ["secio-secp256k1", "libp2p-websocket"] }
parking_lot = "0.7.1"
lazy_static = "1.2"
log = "0.4"
+22 -7
View File
@@ -24,8 +24,9 @@ use libp2p::core::swarm::toggle::Toggle;
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
use libp2p::kad::{Kademlia, KademliaOut};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::ping::{Ping, PingEvent};
use log::{debug, trace, warn};
use libp2p::multiaddr::Protocol;
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
use log::{debug, info, trace, warn};
use std::{cmp, io, fmt, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
@@ -68,14 +69,14 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
let custom_protocols = CustomProto::new(protocol, peerset);
let mut kademlia = Kademlia::new(local_public_key.into_peer_id());
let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id());
for (peer_id, addr) in &known_addresses {
kademlia.add_connected_address(peer_id, addr.clone());
}
let clock = Clock::new();
Behaviour {
ping: Ping::new(),
ping: Ping::new(PingConfig::new()),
custom_protocols,
discovery: DiscoveryBehaviour {
user_defined: known_addresses,
@@ -83,6 +84,7 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
next_kad_random_query: Delay::new(clock.now()),
duration_to_next_kad: Duration::from_secs(1),
clock,
local_peer_id: local_public_key.into_peer_id(),
},
identify,
mdns: if enable_mdns {
@@ -293,10 +295,11 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behavio
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TMessage, TSubstream> {
fn inject_event(&mut self, event: PingEvent) {
match event {
PingEvent::PingSuccess { peer, time } => {
trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer, time);
self.events.push(BehaviourOut::PingSuccess { peer_id: peer, ping_time: time });
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer, rtt);
self.events.push(BehaviourOut::PingSuccess { peer_id: peer, ping_time: rtt });
}
_ => ()
}
}
}
@@ -335,6 +338,8 @@ pub struct DiscoveryBehaviour<TSubstream> {
duration_to_next_kad: Duration,
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
/// Identity of our local node.
local_peer_id: PeerId,
}
impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
@@ -386,6 +391,16 @@ where
NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
let new_addr = addr.clone()
.with(Protocol::P2p(self.local_peer_id.clone().into()));
info!(target: "sub-libp2p", "Discovered external node address: {}", new_addr);
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
info!(target: "sub-libp2p", "No longer listening on {}", addr);
}
fn poll(
&mut self,
params: &mut PollParameters,
@@ -22,6 +22,7 @@ use libp2p::core::{
protocols_handler::IntoProtocolsHandler,
protocols_handler::KeepAlive,
protocols_handler::ProtocolsHandlerUpgrErr,
protocols_handler::SubstreamProtocol,
upgrade::{InboundUpgrade, OutboundUpgrade}
};
use log::{debug, error, warn};
@@ -405,7 +406,7 @@ where
if incoming.is_empty() {
if let Endpoint::Dialer = endpoint {
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.protocol.clone(),
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
});
}
@@ -615,7 +616,7 @@ where
// after all the substreams are closed.
if reenable && shutdown.is_empty() {
return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.protocol.clone(),
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
});
ProtocolState::Opening {
@@ -746,7 +747,7 @@ where
}
state.pending_messages.push(message);
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.protocol.clone(),
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: ()
});
}
@@ -771,7 +772,7 @@ where
}
state.pending_messages.push(message);
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade: self.protocol.clone(),
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: ()
});
}
@@ -793,8 +794,8 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
type OutboundProtocol = RegisteredProtocol<TMessage>;
type OutboundOpenInfo = ();
fn listen_protocol(&self) -> Self::InboundProtocol {
self.protocol.clone()
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(self.protocol.clone())
}
fn inject_fully_negotiated_inbound(
@@ -845,13 +846,13 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
ProtocolState::Init { .. } | ProtocolState::Opening { .. } => {}
ProtocolState::BackCompat { .. } | ProtocolState::Normal { .. } =>
keep_forever = true,
ProtocolState::Disabled { .. } | ProtocolState::Poisoned => return KeepAlive::Now,
ProtocolState::Disabled { .. } | ProtocolState::Poisoned => return KeepAlive::No,
}
if keep_forever {
KeepAlive::Forever
KeepAlive::Yes
} else {
KeepAlive::Now
KeepAlive::No
}
}
@@ -22,7 +22,7 @@ use crate::custom_proto::{CustomMessage, RegisteredProtocol};
use crate::{NetworkConfiguration, NonReservedPeerMode, parse_str_addr};
use fnv::FnvHashMap;
use futures::{prelude::*, Stream};
use libp2p::{multiaddr::Protocol, Multiaddr, core::swarm::NetworkBehaviour, PeerId};
use libp2p::{Multiaddr, core::swarm::NetworkBehaviour, PeerId};
use libp2p::core::{Swarm, nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox};
use libp2p::core::nodes::ConnectedPoint;
use log::{debug, info, warn};
@@ -84,6 +84,7 @@ where TMessage: CustomMessage + Send + 'static {
let local_identity = config.node_key.clone().into_keypair()?;
let local_public = local_identity.public();
let local_peer_id = local_public.clone().into_peer_id();
info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58());
// Build the swarm.
let (mut swarm, bandwidth) = {
@@ -95,12 +96,8 @@ where TMessage: CustomMessage + Send + 'static {
// Listen on multiaddresses.
for addr in &config.listen_addresses {
match Swarm::listen_on(&mut swarm, addr.clone()) {
Ok(mut new_addr) => {
new_addr.append(Protocol::P2p(local_peer_id.clone().into()));
info!(target: "sub-libp2p", "Local node address is: {}", new_addr);
},
Err(err) => warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
if let Err(err) = Swarm::listen_on(&mut swarm, addr.clone()) {
warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err)
}
}
+18 -12
View File
@@ -17,29 +17,35 @@
use futures::{future, stream, prelude::*, try_ready};
use rand::seq::SliceRandom;
use std::io;
use substrate_network_libp2p::{CustomMessage, multiaddr::Protocol, ServiceEvent, build_multiaddr};
use substrate_network_libp2p::{CustomMessage, Multiaddr, multiaddr::Protocol, ServiceEvent, build_multiaddr};
/// Builds two services. The second one and further have the first one as its bootstrap node.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
fn build_nodes<TMsg>(num: usize) -> Vec<substrate_network_libp2p::Service<TMsg>>
fn build_nodes<TMsg>(num: usize, base_port: u16) -> Vec<substrate_network_libp2p::Service<TMsg>>
where TMsg: CustomMessage + Send + 'static
{
let mut result: Vec<substrate_network_libp2p::Service<_>> = Vec::with_capacity(num);
let mut first_addr = None::<Multiaddr>;
for _ in 0 .. num {
for index in 0 .. num {
let mut boot_nodes = Vec::new();
if !result.is_empty() {
let mut bootnode = result[0].listeners().next().unwrap().clone();
bootnode.append(Protocol::P2p(result[0].peer_id().clone().into()));
boot_nodes.push(bootnode.to_string());
if let Some(first_addr) = first_addr.as_ref() {
boot_nodes.push(first_addr.clone()
.with(Protocol::P2p(result[0].peer_id().clone().into()))
.to_string());
}
let config = substrate_network_libp2p::NetworkConfiguration {
listen_addresses: vec![build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(0u16)]],
listen_addresses: vec![build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(base_port + index as u16)]],
boot_nodes,
..substrate_network_libp2p::NetworkConfiguration::default()
};
if first_addr.is_none() {
first_addr = Some(config.listen_addresses.iter().next().unwrap().clone());
}
let proto = substrate_network_libp2p::RegisteredProtocol::new(&b"tst"[..], &[1]);
result.push(substrate_network_libp2p::start_service(config, proto).unwrap().0);
}
@@ -50,7 +56,7 @@ fn build_nodes<TMsg>(num: usize) -> Vec<substrate_network_libp2p::Service<TMsg>>
#[test]
fn basic_two_nodes_connectivity() {
let (mut service1, mut service2) = {
let mut l = build_nodes::<Vec<u8>>(2).into_iter();
let mut l = build_nodes::<Vec<u8>>(2, 50400).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
@@ -90,7 +96,7 @@ fn two_nodes_transfer_lots_of_packets() {
const NUM_PACKETS: u32 = 5000;
let (mut service1, mut service2) = {
let mut l = build_nodes::<Vec<u8>>(2).into_iter();
let mut l = build_nodes::<Vec<u8>>(2, 50450).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
@@ -138,7 +144,7 @@ fn many_nodes_connectivity() {
// increased in the `NetworkConfiguration`.
const NUM_NODES: usize = 25;
let mut futures = build_nodes::<Vec<u8>>(NUM_NODES)
let mut futures = build_nodes::<Vec<u8>>(NUM_NODES, 50500)
.into_iter()
.map(move |mut node| {
let mut num_connecs = 0;
@@ -194,7 +200,7 @@ fn many_nodes_connectivity() {
#[test]
fn basic_two_nodes_requests_in_parallel() {
let (mut service1, mut service2) = {
let mut l = build_nodes::<(Option<u64>, Vec<u8>)>(2).into_iter();
let mut l = build_nodes::<(Option<u64>, Vec<u8>)>(2, 50550).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)