Allow fallback names for protocols (#8682)

* Allow fallback names for protocols

* Apply suggestions from code review

Co-authored-by: Roman Proskuryakov <humbug@deeptown.org>

* Fix some issues

* Fix compilation after merging master

Co-authored-by: Roman Proskuryakov <humbug@deeptown.org>
This commit is contained in:
Pierre Krieger
2021-05-06 16:01:01 +02:00
committed by GitHub
parent db69eb04bb
commit 62650989a3
18 changed files with 340 additions and 115 deletions
@@ -67,7 +67,16 @@ pub enum Event {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
/// This is always equal to the value of
/// [`crate::config::NonDefaultSetConfig::notifications_protocol`] of one of the
/// configured sets.
protocol: Cow<'static, str>,
/// If the negotiation didn't use the main name of the protocol (the one in
/// `notifications_protocol`), then this field contains which name has actually been
/// used.
/// Always contains a value equal to the value in
/// [`crate::config::NonDefaultSetConfig::fallback_names`].
negotiated_fallback: Option<Cow<'static, str>>,
/// Role of the remote.
role: ObservedRole,
},
@@ -19,7 +19,7 @@
//! Implementation of libp2p's `NetworkBehaviour` trait that establishes communications and opens
//! notifications substreams.
pub use self::behaviour::{Notifications, NotificationsOut};
pub use self::behaviour::{Notifications, NotificationsOut, ProtocolConfig};
pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready};
mod behaviour;
@@ -17,7 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::protocol::notifications::{
handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}
handler::{self, NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}
};
use bytes::BytesMut;
@@ -95,10 +95,8 @@ use wasm_timer::Instant;
/// accommodates for any number of connections.
///
pub struct Notifications {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>,
/// Notification protocols. Entries never change after initialization.
notif_protocols: Vec<handler::ProtocolConfig>,
/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
@@ -130,6 +128,19 @@ pub struct Notifications {
events: VecDeque<NetworkBehaviourAction<NotifsHandlerIn, NotificationsOut>>,
}
/// Configuration for a notifications protocol.
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
/// Name of the protocol.
pub name: Cow<'static, str>,
/// Names of the protocol to use if the main one isn't available.
pub fallback_names: Vec<Cow<'static, str>>,
/// Handshake of the protocol.
pub handshake: Vec<u8>,
/// Maximum allowed size for a notification.
pub max_notification_size: u64,
}
/// Identifier for a delay firing.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct DelayId(u64);
@@ -311,6 +322,9 @@ pub enum NotificationsOut {
peer_id: PeerId,
/// Peerset set ID the substream is tied to.
set_id: sc_peerset::SetId,
/// If `Some`, a fallback protocol name has been used rather the main protocol name.
/// Always matches one of the fallback names passed at initialization.
negotiated_fallback: Option<Cow<'static, str>>,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this is out of the concern of this code.
received_handshake: Vec<u8>,
@@ -358,10 +372,15 @@ impl Notifications {
/// Creates a `CustomProtos`.
pub fn new(
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
notif_protocols: impl Iterator<Item = ProtocolConfig>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz))
.map(|cfg| handler::ProtocolConfig {
name: cfg.name,
fallback_names: cfg.fallback_names,
handshake: Arc::new(RwLock::new(cfg.handshake)),
max_notification_size: cfg.max_notification_size,
})
.collect::<Vec<_>>();
assert!(!notif_protocols.is_empty());
@@ -385,7 +404,7 @@ impl Notifications {
handshake_message: impl Into<Vec<u8>>
) {
if let Some(p) = self.notif_protocols.get_mut(usize::from(set_id)) {
*p.1.write() = handshake_message.into();
*p.handshake.write() = handshake_message.into();
} else {
log::error!(target: "sub-libp2p", "Unknown handshake change set: {:?}", set_id);
debug_assert!(false);
@@ -1728,7 +1747,9 @@ impl NetworkBehaviour for Notifications {
}
}
NotifsHandlerOut::OpenResultOk { protocol_index, received_handshake, notifications_sink, .. } => {
NotifsHandlerOut::OpenResultOk {
protocol_index, negotiated_fallback, received_handshake, notifications_sink, ..
} => {
let set_id = sc_peerset::SetId::from(protocol_index);
trace!(target: "sub-libp2p",
"Handler({}, {:?}) => OpenResultOk({:?})",
@@ -1748,6 +1769,7 @@ impl NetworkBehaviour for Notifications {
let event = NotificationsOut::CustomProtocolOpen {
peer_id: source,
set_id,
negotiated_fallback,
received_handshake,
notifications_sink: notifications_sink.clone(),
};
@@ -110,7 +110,7 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
pub struct NotifsHandlerProto {
/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
/// send or respond with in the handshake.
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>,
protocols: Vec<ProtocolConfig>,
}
/// The actual handler once the connection has been established.
@@ -135,20 +135,27 @@ pub struct NotifsHandler {
>,
}
/// Configuration for a notifications protocol.
#[derive(Debug, Clone)]
pub struct ProtocolConfig {
/// Name of the protocol.
pub name: Cow<'static, str>,
/// Names of the protocol to use if the main one isn't available.
pub fallback_names: Vec<Cow<'static, str>>,
/// Handshake of the protocol. The `RwLock` is locked every time a new substream is opened.
pub handshake: Arc<RwLock<Vec<u8>>>,
/// Maximum allowed size for a notification.
pub max_notification_size: u64,
}
/// Fields specific for each individual protocol.
struct Protocol {
/// Name of the protocol.
name: Cow<'static, str>,
/// Other fields.
config: ProtocolConfig,
/// Prototype for the inbound upgrade.
in_upgrade: NotificationsIn,
/// Handshake to send when opening a substream or receiving an open request.
handshake: Arc<RwLock<Vec<u8>>>,
/// Maximum allowed size of individual notifications.
max_notification_size: u64,
/// Current state of the substreams for this protocol.
state: State,
}
@@ -214,21 +221,25 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
fn inbound_protocol(&self) -> UpgradeCollec<NotificationsIn> {
self.protocols.iter()
.map(|(_, p, _, _)| p.clone())
.map(|cfg| NotificationsIn::new(cfg.name.clone(), cfg.fallback_names.clone(), cfg.max_notification_size))
.collect::<UpgradeCollec<_>>()
}
fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
NotifsHandler {
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake, max_size)| {
protocols: self.protocols.into_iter().map(|config| {
let in_upgrade = NotificationsIn::new(
config.name.clone(),
config.fallback_names.clone(),
config.max_notification_size
);
Protocol {
name,
config,
in_upgrade,
handshake,
state: State::Closed {
pending_opening: false,
},
max_notification_size: max_size,
}
}).collect(),
peer_id: peer_id.clone(),
@@ -271,6 +282,8 @@ pub enum NotifsHandlerOut {
OpenResultOk {
/// Index of the protocol in the list of protocols passed at initialization.
protocol_index: usize,
/// Name of the protocol that was actually negotiated, if the default one wasn't available.
negotiated_fallback: Option<Cow<'static, str>>,
/// The endpoint of the connection that is open for custom protocols.
endpoint: ConnectedPoint,
/// Handshake that was sent to us.
@@ -445,18 +458,10 @@ impl NotifsHandlerProto {
/// is always the same whether we open a substream ourselves or respond to handshake from
/// the remote.
pub fn new(
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>,
list: impl Into<Vec<ProtocolConfig>>,
) -> Self {
let protocols = list
.into()
.into_iter()
.map(|(proto_name, msg, max_notif_size)| {
(proto_name.clone(), NotificationsIn::new(proto_name, max_notif_size), msg, max_notif_size)
})
.collect();
NotifsHandlerProto {
protocols,
protocols: list.into(),
}
}
}
@@ -481,7 +486,7 @@ impl ProtocolsHandler for NotifsHandler {
fn inject_fully_negotiated_inbound(
&mut self,
((_remote_handshake, mut new_substream), protocol_index):
(mut in_substream_open, protocol_index):
<Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
(): ()
) {
@@ -495,7 +500,7 @@ impl ProtocolsHandler for NotifsHandler {
));
protocol_info.state = State::OpenDesiredByRemote {
in_substream: new_substream,
in_substream: in_substream_open.substream,
pending_opening,
};
},
@@ -518,16 +523,16 @@ impl ProtocolsHandler for NotifsHandler {
// Create `handshake_message` on a separate line to be sure that the
// lock is released as soon as possible.
let handshake_message = protocol_info.handshake.read().clone();
new_substream.send_handshake(handshake_message);
*in_substream = Some(new_substream);
let handshake_message = protocol_info.config.handshake.read().clone();
in_substream_open.substream.send_handshake(handshake_message);
*in_substream = Some(in_substream_open.substream);
},
}
}
fn inject_fully_negotiated_outbound(
&mut self,
(handshake, substream): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
new_open: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
protocol_index: Self::OutboundOpenInfo
) {
match self.protocols[protocol_index].state {
@@ -553,15 +558,16 @@ impl ProtocolsHandler for NotifsHandler {
self.protocols[protocol_index].state = State::Open {
notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
out_substream: Some(substream),
out_substream: Some(new_open.substream),
in_substream: in_substream.take(),
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::OpenResultOk {
protocol_index,
negotiated_fallback: new_open.negotiated_fallback,
endpoint: self.endpoint.clone(),
received_handshake: handshake,
received_handshake: new_open.handshake,
notifications_sink
}
));
@@ -577,9 +583,10 @@ impl ProtocolsHandler for NotifsHandler {
State::Closed { pending_opening } => {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone(),
protocol_info.max_notification_size
protocol_info.config.name.clone(),
protocol_info.config.fallback_names.clone(),
protocol_info.config.handshake.read().clone(),
protocol_info.config.max_notification_size
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
@@ -593,13 +600,14 @@ impl ProtocolsHandler for NotifsHandler {
};
},
State::OpenDesiredByRemote { pending_opening, in_substream } => {
let handshake_message = protocol_info.handshake.read().clone();
let handshake_message = protocol_info.config.handshake.read().clone();
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.config.name.clone(),
protocol_info.config.fallback_names.clone(),
handshake_message.clone(),
protocol_info.max_notification_size,
protocol_info.config.max_notification_size,
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
@@ -18,7 +18,7 @@
#![cfg(test)]
use crate::protocol::notifications::{Notifications, NotificationsOut};
use crate::protocol::notifications::{Notifications, NotificationsOut, ProtocolConfig};
use futures::prelude::*;
use libp2p::{PeerId, Multiaddr, Transport};
@@ -80,7 +80,12 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});
let behaviour = CustomProtoWithAddr {
inner: Notifications::new(peerset, iter::once(("/foo".into(), Vec::new(), 1024 * 1024))),
inner: Notifications::new(peerset, iter::once(ProtocolConfig {
name: "/foo".into(),
fallback_names: Vec::new(),
handshake: Vec::new(),
max_notification_size: 1024 * 1024
})),
addrs: addrs
.iter()
.enumerate()
@@ -19,8 +19,10 @@
pub use self::collec::UpgradeCollec;
pub use self::notifications::{
NotificationsIn,
NotificationsInOpen,
NotificationsInSubstream,
NotificationsOut,
NotificationsOutOpen,
NotificationsOutSubstream,
NotificationsHandshakeError,
NotificationsOutError,
@@ -41,7 +41,7 @@ use futures::prelude::*;
use asynchronous_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use log::error;
use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, iter, mem, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, mem, pin::Pin, task::{Context, Poll}, vec};
use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
@@ -52,7 +52,8 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
#[derive(Debug, Clone)]
pub struct NotificationsIn {
/// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, str>,
/// The first one is the main name, while the other ones are fall backs.
protocol_names: Vec<Cow<'static, str>>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
}
@@ -62,7 +63,8 @@ pub struct NotificationsIn {
#[derive(Debug, Clone)]
pub struct NotificationsOut {
/// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, str>,
/// The first one is the main name, while the other ones are fall backs.
protocol_names: Vec<Cow<'static, str>>,
/// Message to send when we start the handshake.
initial_message: Vec<u8>,
/// Maximum allowed size for a single notification.
@@ -106,51 +108,54 @@ pub struct NotificationsOutSubstream<TSubstream> {
impl NotificationsIn {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, str>>, max_notification_size: u64) -> Self {
pub fn new(
main_protocol_name: impl Into<Cow<'static, str>>,
fallback_names: Vec<Cow<'static, str>>,
max_notification_size: u64
) -> Self {
let mut protocol_names = fallback_names;
protocol_names.insert(0, main_protocol_name.into());
NotificationsIn {
protocol_name: protocol_name.into(),
protocol_names,
max_notification_size,
}
}
}
impl UpgradeInfo for NotificationsIn {
type Info = Cow<'static, [u8]>;
type InfoIter = iter::Once<Self::Info>;
type Info = StringProtocolName;
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
let bytes: Cow<'static, [u8]> = match &self.protocol_name {
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
};
iter::once(bytes)
self.protocol_names.iter().cloned().map(StringProtocolName).collect::<Vec<_>>().into_iter()
}
}
impl<TSubstream> InboundUpgrade<TSubstream> for NotificationsIn
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = (Vec<u8>, NotificationsInSubstream<TSubstream>);
type Output = NotificationsInOpen<TSubstream>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Error = NotificationsHandshakeError;
fn upgrade_inbound(
self,
mut socket: TSubstream,
_: Self::Info,
negotiated_name: Self::Info,
) -> Self::Future {
Box::pin(async move {
let initial_message_len = unsigned_varint::aio::read_usize(&mut socket).await?;
if initial_message_len > MAX_HANDSHAKE_SIZE {
let handshake_len = unsigned_varint::aio::read_usize(&mut socket).await?;
if handshake_len > MAX_HANDSHAKE_SIZE {
return Err(NotificationsHandshakeError::TooLarge {
requested: initial_message_len,
requested: handshake_len,
max: MAX_HANDSHAKE_SIZE,
});
}
let mut initial_message = vec![0u8; initial_message_len];
if !initial_message.is_empty() {
socket.read_exact(&mut initial_message).await?;
let mut handshake = vec![0u8; handshake_len];
if !handshake.is_empty() {
socket.read_exact(&mut handshake).await?;
}
let mut codec = UviBytes::default();
@@ -161,11 +166,30 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
handshake: NotificationsInSubstreamHandshake::NotSent,
};
Ok((initial_message, substream))
Ok(NotificationsInOpen {
handshake,
negotiated_fallback: if negotiated_name.0 == self.protocol_names[0] {
None
} else {
Some(negotiated_name.0)
},
substream,
})
})
}
}
/// Yielded by the [`NotificationsIn`] after a successfuly upgrade.
pub struct NotificationsInOpen<TSubstream> {
/// Handshake sent by the remote.
pub handshake: Vec<u8>,
/// If the negotiated name is not the "main" protocol name but a fallback, contains the
/// name of the negotiated fallback.
pub negotiated_fallback: Option<Cow<'static, str>>,
/// Implementation of `Stream` that allows receives messages from the substream.
pub substream: NotificationsInSubstream<TSubstream>,
}
impl<TSubstream> NotificationsInSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
{
@@ -296,7 +320,8 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
impl NotificationsOut {
/// Builds a new potential upgrade.
pub fn new(
protocol_name: impl Into<Cow<'static, str>>,
main_protocol_name: impl Into<Cow<'static, str>>,
fallback_names: Vec<Cow<'static, str>>,
initial_message: impl Into<Vec<u8>>,
max_notification_size: u64,
) -> Self {
@@ -305,38 +330,47 @@ impl NotificationsOut {
error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
}
let mut protocol_names = fallback_names;
protocol_names.insert(0, main_protocol_name.into());
NotificationsOut {
protocol_name: protocol_name.into(),
protocol_names,
initial_message,
max_notification_size,
}
}
}
/// Implementation of the `ProtocolName` trait, where the protocol name is a string.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StringProtocolName(Cow<'static, str>);
impl upgrade::ProtocolName for StringProtocolName {
fn protocol_name(&self) -> &[u8] {
self.0.as_bytes()
}
}
impl UpgradeInfo for NotificationsOut {
type Info = Cow<'static, [u8]>;
type InfoIter = iter::Once<Self::Info>;
type Info = StringProtocolName;
type InfoIter = vec::IntoIter<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
let bytes: Cow<'static, [u8]> = match &self.protocol_name {
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
};
iter::once(bytes)
self.protocol_names.iter().cloned().map(StringProtocolName).collect::<Vec<_>>().into_iter()
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for NotificationsOut
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = (Vec<u8>, NotificationsOutSubstream<TSubstream>);
type Output = NotificationsOutOpen<TSubstream>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
type Error = NotificationsHandshakeError;
fn upgrade_outbound(
self,
mut socket: TSubstream,
_: Self::Info,
negotiated_name: Self::Info,
) -> Self::Future {
Box::pin(async move {
upgrade::write_with_len_prefix(&mut socket, &self.initial_message).await?;
@@ -358,13 +392,32 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
let mut codec = UviBytes::default();
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
Ok((handshake, NotificationsOutSubstream {
socket: Framed::new(socket, codec),
}))
Ok(NotificationsOutOpen {
handshake,
negotiated_fallback: if negotiated_name.0 == self.protocol_names[0] {
None
} else {
Some(negotiated_name.0)
},
substream: NotificationsOutSubstream {
socket: Framed::new(socket, codec),
}
})
})
}
}
/// Yielded by the [`NotificationsOut`] after a successfuly upgrade.
pub struct NotificationsOutOpen<TSubstream> {
/// Handshake returned by the remote.
pub handshake: Vec<u8>,
/// If the negotiated name is not the "main" protocol name but a fallback, contains the
/// name of the negotiated fallback.
pub negotiated_fallback: Option<Cow<'static, str>>,
/// Implementation of `Sink` that allows sending messages on the substream.
pub substream: NotificationsOutSubstream<TSubstream>,
}
impl<TSubstream> Sink<Vec<u8>> for NotificationsOutSubstream<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin,
{
@@ -436,7 +489,7 @@ pub enum NotificationsOutError {
#[cfg(test)]
mod tests {
use super::{NotificationsIn, NotificationsOut};
use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen};
use async_std::net::{TcpListener, TcpStream};
use futures::{prelude::*, channel::oneshot};
@@ -450,9 +503,9 @@ mod tests {
let client = async_std::task::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let (handshake, mut substream) = upgrade::apply_outbound(
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();
@@ -465,12 +518,12 @@ mod tests {
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
assert_eq!(initial_message, b"initial message");
assert_eq!(handshake, b"initial message");
substream.send_handshake(&b"hello world"[..]);
let msg = substream.next().await.unwrap().unwrap();
@@ -489,9 +542,9 @@ mod tests {
let client = async_std::task::spawn(async move {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let (handshake, mut substream) = upgrade::apply_outbound(
let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, vec![], 1024 * 1024),
NotificationsOut::new(PROTO_NAME, Vec::new(), vec![], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();
@@ -504,12 +557,12 @@ mod tests {
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
assert!(initial_message.is_empty());
assert!(handshake.is_empty());
substream.send_handshake(vec![]);
let msg = substream.next().await.unwrap().unwrap();
@@ -528,7 +581,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let outcome = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"hello"[..], 1024 * 1024),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"hello"[..], 1024 * 1024),
upgrade::Version::V1
).await;
@@ -543,12 +596,12 @@ mod tests {
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let (initial_msg, substream) = upgrade::apply_inbound(
let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
assert_eq!(initial_msg, b"hello");
assert_eq!(handshake, b"hello");
// We successfully upgrade to the protocol, but then close the substream.
drop(substream);
@@ -567,7 +620,7 @@ mod tests {
let ret = upgrade::apply_outbound(
socket,
// We check that an initial message that is too large gets refused.
NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024),
NotificationsOut::new(PROTO_NAME, Vec::new(), (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024),
upgrade::Version::V1
).await;
assert!(ret.is_err());
@@ -580,7 +633,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await;
assert!(ret.is_err());
});
@@ -597,7 +650,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await;
assert!(ret.is_err());
@@ -608,11 +661,11 @@ mod tests {
listener_addr_tx.send(listener.local_addr().unwrap()).unwrap();
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024)
).await.unwrap();
assert_eq!(initial_message, b"initial message");
assert_eq!(handshake, b"initial message");
// We check that a handshake that is too large gets refused.
substream.send_handshake((0..32768).map(|_| 0).collect::<Vec<_>>());