client/*: Treat protocol name as str and not [u8] (#6967)

* client/*: Treat protocol name as str and not [u8]

Notification protocol names are in practice always valid utf8 strings.
Instead of treating them as such in the type system, thus far they were
casted to a [u8] at creation time.

With this commit protocol names are instead treated as valid utf8
strings throughout the codebase and passed as `Cow<'static, str>`
instead of `Cow<'static, [u8]>`. Among other things this eliminates the
need for string casting when logging.

* client/network: Don't allocate when protocol name is borrowed
This commit is contained in:
Max Inden
2020-08-28 17:34:25 +02:00
committed by GitHub
parent ff1e8150e1
commit 8fd343e39d
16 changed files with 87 additions and 76 deletions
@@ -120,7 +120,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>,
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,
/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
@@ -322,7 +322,7 @@ pub enum GenericProtoOut {
/// Id of the peer the message came from.
peer_id: PeerId,
/// Engine corresponding to the message.
protocol_name: Cow<'static, [u8]>,
protocol_name: Cow<'static, str>,
/// Message that has been received.
message: BytesMut,
},
@@ -360,7 +360,7 @@ impl GenericProto {
/// will retain the protocols that were registered then, and not any new one.
pub fn register_notif_protocol(
&mut self,
protocol_name: impl Into<Cow<'static, [u8]>>,
protocol_name: impl Into<Cow<'static, str>>,
handshake_msg: impl Into<Vec<u8>>
) {
self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into()))));
@@ -371,10 +371,10 @@ impl GenericProto {
/// Has no effect if the protocol is unknown.
pub fn set_notif_protocol_handshake(
&mut self,
protocol_name: &[u8],
protocol_name: &str,
handshake_message: impl Into<Vec<u8>>
) {
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) {
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == protocol_name) {
*protocol.1.write() = handshake_message.into();
}
}
@@ -551,7 +551,7 @@ impl GenericProto {
pub fn write_notification(
&mut self,
target: &PeerId,
protocol_name: Cow<'static, [u8]>,
protocol_name: Cow<'static, str>,
message: impl Into<Vec<u8>>,
encoded_fallback_message: Vec<u8>,
) {
@@ -569,11 +569,11 @@ impl GenericProto {
target: "sub-libp2p",
"External API => Notification({:?}, {:?})",
target,
str::from_utf8(&protocol_name)
protocol_name,
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_sync_notification(
&protocol_name,
protocol_name,
encoded_fallback_message,
message
);
@@ -1374,7 +1374,7 @@ impl NetworkBehaviour for GenericProto {
target: "sub-libp2p",
"Handler({:?}) => Notification({:?})",
source,
str::from_utf8(&protocol_name)
protocol_name,
);
trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source);
let event = GenericProtoOut::Notification {
@@ -224,7 +224,7 @@ pub enum NotifsHandlerOut {
/// Received a message on a custom protocol substream.
Notification {
/// Name of the protocol of the message.
protocol_name: Cow<'static, [u8]>,
protocol_name: Cow<'static, str>,
/// Message that has been received.
message: BytesMut,
@@ -270,7 +270,7 @@ enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`].
Notification {
protocol_name: Vec<u8>,
protocol_name: Cow<'static, str>,
encoded_fallback_message: Vec<u8>,
message: Vec<u8>,
},
@@ -311,13 +311,13 @@ impl NotificationsSink {
/// This method will be removed in a future version.
pub fn send_sync_notification<'a>(
&'a self,
protocol_name: &[u8],
protocol_name: Cow<'static, str>,
encoded_fallback_message: impl Into<Vec<u8>>,
message: impl Into<Vec<u8>>
) {
let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Notification {
protocol_name: protocol_name.to_owned(),
protocol_name: protocol_name,
encoded_fallback_message: encoded_fallback_message.into(),
message: message.into()
});
@@ -336,12 +336,12 @@ impl NotificationsSink {
///
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
/// error to send a notification using an unknown protocol.
pub async fn reserve_notification<'a>(&'a self, protocol_name: &[u8]) -> Result<Ready<'a>, ()> {
pub async fn reserve_notification<'a>(&'a self, protocol_name: Cow<'static, str>) -> Result<Ready<'a>, ()> {
let mut lock = self.inner.async_channel.lock().await;
let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
if poll_ready.is_ok() {
Ok(Ready { protocol_name: protocol_name.to_owned(), lock })
Ok(Ready { protocol_name: protocol_name, lock })
} else {
Err(())
}
@@ -355,7 +355,7 @@ pub struct Ready<'a> {
/// Guarded channel. The channel inside is guaranteed to not be full.
lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
/// Name of the protocol. Should match one of the protocols passed at initialization.
protocol_name: Vec<u8>,
protocol_name: Cow<'static, str>,
}
impl<'a> Ready<'a> {
@@ -392,7 +392,7 @@ impl NotifsHandlerProto {
/// ourselves or respond to handshake from the remote.
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>>,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
) -> Self {
let list = list.into();
@@ -613,7 +613,7 @@ impl ProtocolsHandler for NotifsHandler {
message
} => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() == &protocol_name[..] && handler.is_open() {
if *handler.protocol_name() == protocol_name && handler.is_open() {
handler.send_or_discard(message);
continue 'poll_notifs_sink;
}
@@ -698,7 +698,7 @@ impl ProtocolsHandler for NotifsHandler {
if self.notifications_sink_rx.is_some() {
let msg = NotifsHandlerOut::Notification {
message,
protocol_name: handler.protocol_name().to_owned().into(),
protocol_name: handler.protocol_name().clone(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(msg));
}
@@ -109,7 +109,7 @@ pub enum NotifsInHandlerOut {
impl NotifsInHandlerProto {
/// Builds a new `NotifsInHandlerProto`.
pub fn new(
protocol_name: impl Into<Cow<'static, [u8]>>
protocol_name: impl Into<Cow<'static, str>>
) -> Self {
NotifsInHandlerProto {
in_protocol: NotificationsIn::new(protocol_name),
@@ -136,7 +136,7 @@ impl IntoProtocolsHandler for NotifsInHandlerProto {
impl NotifsInHandler {
/// Returns the name of the protocol that we accept.
pub fn protocol_name(&self) -> &[u8] {
pub fn protocol_name(&self) -> &Cow<'static, str> {
self.in_protocol.protocol_name()
}
}
@@ -57,13 +57,13 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// See the documentation of [`NotifsOutHandler`] for more information.
pub struct NotifsOutHandlerProto {
/// Name of the protocol to negotiate.
protocol_name: Cow<'static, [u8]>,
protocol_name: Cow<'static, str>,
}
impl NotifsOutHandlerProto {
/// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the
/// notifications substream.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
NotifsOutHandlerProto {
protocol_name: protocol_name.into(),
}
@@ -97,7 +97,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
/// the remote for the purpose of sending notifications to it.
pub struct NotifsOutHandler {
/// Name of the protocol to negotiate.
protocol_name: Cow<'static, [u8]>,
protocol_name: Cow<'static, str>,
/// Relationship with the node we're connected to.
state: State,
@@ -220,7 +220,7 @@ impl NotifsOutHandler {
}
/// Returns the name of the protocol that we negotiate.
pub fn protocol_name(&self) -> &[u8] {
pub fn protocol_name(&self) -> &Cow<'static, str> {
&self.protocol_name
}
@@ -50,7 +50,7 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
#[derive(Debug, Clone)]
pub struct NotificationsIn {
/// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, [u8]>,
protocol_name: Cow<'static, str>,
}
/// Upgrade that opens a substream, waits for the remote to accept by sending back a status
@@ -58,7 +58,7 @@ pub struct NotificationsIn {
#[derive(Debug, Clone)]
pub struct NotificationsOut {
/// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, [u8]>,
protocol_name: Cow<'static, str>,
/// Message to send when we start the handshake.
initial_message: Vec<u8>,
}
@@ -100,14 +100,14 @@ pub struct NotificationsOutSubstream<TSubstream> {
impl NotificationsIn {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
NotificationsIn {
protocol_name: protocol_name.into(),
}
}
/// Returns the name of the protocol that we accept.
pub fn protocol_name(&self) -> &[u8] {
pub fn protocol_name(&self) -> &Cow<'static, str> {
&self.protocol_name
}
}
@@ -117,7 +117,11 @@ impl UpgradeInfo for NotificationsIn {
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_name.clone())
let bytes: Cow<'static, [u8]> = match &self.protocol_name {
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
};
iter::once(bytes)
}
}
@@ -244,7 +248,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
impl NotificationsOut {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, [u8]>>, initial_message: impl Into<Vec<u8>>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, str>>, initial_message: impl Into<Vec<u8>>) -> Self {
let initial_message = initial_message.into();
if initial_message.len() > MAX_HANDSHAKE_SIZE {
error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
@@ -262,7 +266,11 @@ impl UpgradeInfo for NotificationsOut {
type InfoIter = iter::Once<Self::Info>;
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.protocol_name.clone())
let bytes: Cow<'static, [u8]> = match &self.protocol_name {
Cow::Borrowed(s) => Cow::Borrowed(s.as_bytes()),
Cow::Owned(s) => Cow::Owned(s.as_bytes().to_vec())
};
iter::once(bytes)
}
}
@@ -378,10 +386,11 @@ mod tests {
use async_std::net::{TcpListener, TcpStream};
use futures::{prelude::*, channel::oneshot};
use libp2p::core::upgrade;
use std::borrow::Cow;
#[test]
fn basic_works() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1";
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
@@ -420,7 +429,7 @@ mod tests {
fn empty_handshake() {
// Check that everything still works when the handshake messages are empty.
const PROTO_NAME: &'static [u8] = b"/test/proto/1";
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
@@ -457,7 +466,7 @@ mod tests {
#[test]
fn refused() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1";
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
@@ -495,7 +504,7 @@ mod tests {
#[test]
fn large_initial_message_refused() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1";
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {
@@ -526,7 +535,7 @@ mod tests {
#[test]
fn large_handshake_refused() {
const PROTO_NAME: &'static [u8] = b"/test/proto/1";
const PROTO_NAME: Cow<'static, str> = Cow::Borrowed("/test/proto/1");
let (listener_addr_tx, listener_addr_rx) = oneshot::channel();
let client = async_std::task::spawn(async move {