Upgrade to libp2p-0.28. (#7077)

* Upgrade to libp2p-0.28

* Clean up test imports.

* CI

* CI

* CI?

* CI once more.

* One more.

* CI

* CI

* CI
This commit is contained in:
Roman Borschel
2020-09-14 16:27:58 +02:00
committed by GitHub
parent 96901b9662
commit 51706a7620
24 changed files with 181 additions and 268 deletions
@@ -394,25 +394,27 @@ impl ProtocolsHandler for NotifsHandler {
type OutboundProtocol = EitherUpgrade<NotificationsOut, RegisteredProtocol>;
// Index within the `out_handlers`; None for legacy
type OutboundOpenInfo = Option<usize>;
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
let in_handlers = self.in_handlers.iter()
.map(|(h, _)| h.listen_protocol().into_upgrade().1)
.collect::<UpgradeCollec<_>>();
let proto = SelectUpgrade::new(in_handlers, self.legacy.listen_protocol().into_upgrade().1);
SubstreamProtocol::new(proto)
SubstreamProtocol::new(proto, ())
}
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
out: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
match out {
EitherOutput::First((out, num)) =>
self.in_handlers[num].0.inject_fully_negotiated_inbound(out),
self.in_handlers[num].0.inject_fully_negotiated_inbound(out, ()),
EitherOutput::Second(out) =>
self.legacy.inject_fully_negotiated_inbound(out),
self.legacy.inject_fully_negotiated_inbound(out, ()),
}
}
@@ -619,10 +621,11 @@ impl ProtocolsHandler for NotifsHandler {
if self.pending_legacy_handshake.is_none() {
while let Poll::Ready(ev) = self.legacy.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
protocol: protocol
.map_upgrade(EitherUpgrade::B)
.map_info(|()| None)
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen {
received_handshake,
@@ -705,10 +708,11 @@ impl ProtocolsHandler for NotifsHandler {
for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() {
while let Poll::Ready(ev) = handler.poll(cx) {
match ev {
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>
ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol } =>
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::A),
info: Some(handler_num),
protocol: protocol
.map_upgrade(EitherUpgrade::A)
.map_info(|()| Some(handler_num))
}),
ProtocolsHandlerEvent::Close(err) => void::unreachable(err),
@@ -253,8 +253,7 @@ impl LegacyProtoHandler {
if incoming.is_empty() {
if let ConnectedPoint::Dialer { .. } = self.endpoint {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
protocol: SubstreamProtocol::new(self.protocol.clone(), ()),
});
}
ProtocolState::Opening {
@@ -428,8 +427,7 @@ impl LegacyProtoHandler {
deadline: Delay::new(Duration::from_secs(60))
};
Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
protocol: SubstreamProtocol::new(self.protocol.clone(), ()),
})
} else {
self.state = ProtocolState::Disabled { shutdown, reenable };
@@ -498,14 +496,16 @@ impl ProtocolsHandler for LegacyProtoHandler {
type InboundProtocol = RegisteredProtocol;
type OutboundProtocol = RegisteredProtocol;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(self.protocol.clone())
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.protocol.clone(), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
(substream, handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
(substream, handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
self.inject_fully_negotiated(substream, handshake);
}
@@ -175,14 +175,16 @@ impl ProtocolsHandler for NotifsInHandler {
type InboundProtocol = NotificationsIn;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(self.in_protocol.clone())
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(self.in_protocol.clone(), ())
}
fn inject_fully_negotiated_inbound(
&mut self,
(msg, proto): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
(msg, proto): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
// If a substream already exists, we drop it and replace it with the new incoming one.
if self.substream.is_some() {
@@ -267,14 +267,16 @@ impl ProtocolsHandler for NotifsOutHandler {
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = NotificationsOut;
type OutboundOpenInfo = ();
type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
SubstreamProtocol::new(DeniedUpgrade)
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
SubstreamProtocol::new(DeniedUpgrade, ())
}
fn inject_fully_negotiated_inbound(
&mut self,
proto: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
proto: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
// We should never reach here. `proto` is a `Void`.
void::unreachable(proto)
@@ -309,8 +311,7 @@ impl ProtocolsHandler for NotifsOutHandler {
State::Disabled => {
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
self.state = State::Opening { initial_message };
},
@@ -329,8 +330,7 @@ impl ProtocolsHandler for NotifsOutHandler {
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message.clone());
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
self.state = State::Opening { initial_message };
},
@@ -414,8 +414,7 @@ impl ProtocolsHandler for NotifsOutHandler {
self.state = State::Opening { initial_message: initial_message.clone() };
let proto = NotificationsOut::new(self.protocol_name.clone(), initial_message);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(proto).with_timeout(OPEN_TIMEOUT),
info: (),
protocol: SubstreamProtocol::new(proto, ()).with_timeout(OPEN_TIMEOUT),
});
return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsOutHandlerOut::Closed));
}
@@ -20,7 +20,14 @@ use crate::protocol::generic_proto::{GenericProto, GenericProtoOut};
use futures::prelude::*;
use libp2p::{PeerId, Multiaddr, Transport};
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint};
use libp2p::core::{
connection::{ConnectionId, ListenerId},
ConnectedPoint,
muxing,
transport::MemoryTransport,
upgrade
};
use libp2p::{identity, noise, yamux};
use libp2p::swarm::{
Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters,
NetworkBehaviour, NetworkBehaviourAction
@@ -32,7 +39,7 @@ use std::{error, io, task::Context, task::Poll, time::Duration};
fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
let mut out = Vec::with_capacity(2);
let keypairs: Vec<_> = (0..2).map(|_| libp2p::identity::Keypair::generate_ed25519()).collect();
let keypairs: Vec<_> = (0..2).map(|_| identity::Keypair::generate_ed25519()).collect();
let addrs: Vec<Multiaddr> = (0..2)
.map(|_| format!("/memory/{}", rand::random::<u64>()).parse().unwrap())
.collect();
@@ -40,25 +47,16 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
for index in 0 .. 2 {
let keypair = keypairs[index].clone();
let local_peer_id = keypair.public().into_peer_id();
let transport = libp2p::core::transport::MemoryTransport
.and_then(move |out, endpoint| {
let secio = libp2p::secio::SecioConfig::new(keypair);
libp2p::core::upgrade::apply(
out,
secio,
endpoint,
libp2p::core::upgrade::Version::V1
)
})
.and_then(move |(peer_id, stream), endpoint| {
libp2p::core::upgrade::apply(
stream,
libp2p::yamux::Config::default(),
endpoint,
libp2p::core::upgrade::Version::V1
)
.map_ok(|muxer| (peer_id, libp2p::core::muxing::StreamMuxerBox::new(muxer)))
})
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&keypair)
.unwrap();
let transport = MemoryTransport
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(yamux::Config::default())
.map(|(peer, muxer), _| (peer, muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();