Add explicit limits to notifications sizes and adjust yamux buffer size (#7925)

* Add explicit limits to notifications sizes and adjust yamux buffer size

* Docfix

* Tests

* Document these 10 bytes
This commit is contained in:
Pierre Krieger
2021-01-19 12:00:37 +01:00
committed by GitHub
parent bb46f8ac30
commit 8e04515912
12 changed files with 125 additions and 51 deletions
@@ -103,7 +103,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>,
/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
@@ -374,10 +374,10 @@ impl GenericProto {
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
.map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz))
.collect::<Vec<_>>();
assert!(!notif_protocols.is_empty());
@@ -113,7 +113,7 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
pub struct NotifsHandlerProto {
/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
/// send or respond with in the handshake.
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>)>,
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>,
/// Configuration for the legacy protocol upgrade.
legacy_protocol: RegisteredProtocol,
@@ -161,6 +161,9 @@ struct Protocol {
/// Handshake to send when opening a substream or receiving an open request.
handshake: Arc<RwLock<Vec<u8>>>,
/// Maximum allowed size of individual notifications.
max_notification_size: u64,
/// Current state of the substreams for this protocol.
state: State,
}
@@ -226,7 +229,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> {
let protocols = self.protocols.iter()
.map(|(_, p, _)| p.clone())
.map(|(_, p, _, _)| p.clone())
.collect::<UpgradeCollec<_>>();
SelectUpgrade::new(protocols, self.legacy_protocol.clone())
@@ -234,14 +237,15 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
NotifsHandler {
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake)| {
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake, max_size)| {
Protocol {
name,
in_upgrade,
handshake,
state: State::Closed {
pending_opening: false,
}
},
max_notification_size: max_size,
}
}).collect(),
peer_id: peer_id.clone(),
@@ -467,18 +471,19 @@ pub enum NotifsHandlerError {
impl NotifsHandlerProto {
/// Builds a new handler.
///
/// `list` is a list of notification protocols names, and the message to send as part of the
/// handshake. At the moment, the message is always the same whether we open a substream
/// ourselves or respond to handshake from the remote.
/// `list` is a list of notification protocols names, the message to send as part of the
/// handshake, and the maximum allowed size of a notification. At the moment, the message
/// is always the same whether we open a substream ourselves or respond to handshake from
/// the remote.
pub fn new(
legacy_protocol: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>,
) -> Self {
let protocols = list
.into()
.into_iter()
.map(|(proto_name, msg)| {
(proto_name.clone(), NotificationsIn::new(proto_name), msg)
.map(|(proto_name, msg, max_notif_size)| {
(proto_name.clone(), NotificationsIn::new(proto_name, max_notif_size), msg, max_notif_size)
})
.collect();
@@ -624,7 +629,8 @@ impl ProtocolsHandler for NotifsHandler {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone()
protocol_info.handshake.read().clone(),
protocol_info.max_notification_size
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
@@ -643,7 +649,8 @@ impl ProtocolsHandler for NotifsHandler {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
handshake_message.clone()
handshake_message.clone(),
protocol_info.max_notification_size,
);
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
@@ -82,7 +82,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(
"test", &[1], vec![], peerset,
iter::once(("/foo".into(), Vec::new()))
iter::once(("/foo".into(), Vec::new(), 1024 * 1024))
),
addrs: addrs
.iter()
@@ -41,7 +41,7 @@ use futures::prelude::*;
use asynchronous_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use log::error;
use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, iter, mem, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;
/// Maximum allowed size of the two handshake messages, in bytes.
@@ -53,6 +53,8 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
pub struct NotificationsIn {
/// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, str>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
}
/// Upgrade that opens a substream, waits for the remote to accept by sending back a status
@@ -63,6 +65,8 @@ pub struct NotificationsOut {
protocol_name: Cow<'static, str>,
/// Message to send when we start the handshake.
initial_message: Vec<u8>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
}
/// A substream for incoming notification messages.
@@ -102,9 +106,10 @@ pub struct NotificationsOutSubstream<TSubstream> {
impl NotificationsIn {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, str>>, max_notification_size: u64) -> Self {
NotificationsIn {
protocol_name: protocol_name.into(),
max_notification_size,
}
}
}
@@ -148,8 +153,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
socket.read_exact(&mut initial_message).await?;
}
let mut codec = UviBytes::default();
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
let substream = NotificationsInSubstream {
socket: Framed::new(socket, UviBytes::default()),
socket: Framed::new(socket, codec),
handshake: NotificationsInSubstreamHandshake::NotSent,
};
@@ -287,7 +295,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
impl NotificationsOut {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, str>>, initial_message: impl Into<Vec<u8>>) -> Self {
pub fn new(
protocol_name: impl Into<Cow<'static, str>>,
initial_message: impl Into<Vec<u8>>,
max_notification_size: u64,
) -> Self {
let initial_message = initial_message.into();
if initial_message.len() > MAX_HANDSHAKE_SIZE {
error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
@@ -296,6 +308,7 @@ impl NotificationsOut {
NotificationsOut {
protocol_name: protocol_name.into(),
initial_message,
max_notification_size,
}
}
}
@@ -342,8 +355,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
socket.read_exact(&mut handshake).await?;
}
let mut codec = UviBytes::default();
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));
Ok((handshake, NotificationsOutSubstream {
socket: Framed::new(socket, UviBytes::default()),
socket: Framed::new(socket, codec),
}))
})
}
@@ -436,7 +452,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let (handshake, mut substream) = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"initial message"[..]),
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();
@@ -451,7 +467,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();
assert_eq!(initial_message, b"initial message");
@@ -475,7 +491,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let (handshake, mut substream) = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, vec![]),
NotificationsOut::new(PROTO_NAME, vec![], 1024 * 1024),
upgrade::Version::V1
).await.unwrap();
@@ -490,7 +506,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();
assert!(initial_message.is_empty());
@@ -512,7 +528,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let outcome = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"hello"[..]),
NotificationsOut::new(PROTO_NAME, &b"hello"[..], 1024 * 1024),
upgrade::Version::V1
).await;
@@ -529,7 +545,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_msg, substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();
assert_eq!(initial_msg, b"hello");
@@ -551,7 +567,7 @@ mod tests {
let ret = upgrade::apply_outbound(
socket,
// We check that an initial message that is too large gets refused.
NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>()),
NotificationsOut::new(PROTO_NAME, (0..32768).map(|_| 0).collect::<Vec<_>>(), 1024 * 1024),
upgrade::Version::V1
).await;
assert!(ret.is_err());
@@ -564,7 +580,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let ret = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await;
assert!(ret.is_err());
});
@@ -581,7 +597,7 @@ mod tests {
let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap();
let ret = upgrade::apply_outbound(
socket,
NotificationsOut::new(PROTO_NAME, &b"initial message"[..]),
NotificationsOut::new(PROTO_NAME, &b"initial message"[..], 1024 * 1024),
upgrade::Version::V1
).await;
assert!(ret.is_err());
@@ -594,7 +610,7 @@ mod tests {
let (socket, _) = listener.accept().await.unwrap();
let (initial_message, mut substream) = upgrade::apply_inbound(
socket,
NotificationsIn::new(PROTO_NAME)
NotificationsIn::new(PROTO_NAME, 1024 * 1024)
).await.unwrap();
assert_eq!(initial_message, b"initial message");