Stop sending messages on legacy substream altogether (#6975)

* Stop sending messages on legacy substream altogether

* Ensure that handshake is sent back even in case of back-pressure

* Update client/network/src/protocol/generic_proto/handler/group.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Also process OpenRequest and Closed

* Also process OpenRequest and Closed

* Fix bad merge

* God I'm so lost with all these merges

* Immediately return Closed

* Add warning for sending on non-registered protocol

* Register GrandPa protocol in tests

* Update client/network/src/protocol/generic_proto/handler/group.rs

Co-authored-by: Max Inden <mail@max-inden.de>

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Pierre Krieger
2020-09-02 17:28:03 +02:00
committed by GitHub
parent 1d10db3184
commit 34980ec88a
9 changed files with 71 additions and 434 deletions
@@ -553,7 +553,6 @@ impl GenericProto {
target: &PeerId,
protocol_name: Cow<'static, str>,
message: impl Into<Vec<u8>>,
encoded_fallback_message: Vec<u8>,
) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
@@ -574,33 +573,10 @@ impl GenericProto {
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_sync_notification(
protocol_name,
encoded_fallback_message,
message
);
}
/// Sends a message to a peer.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn send_packet(&mut self, target: &PeerId, message: Vec<u8>) {
let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) {
None => {
debug!(target: "sub-libp2p",
"Tried to sent packet to {:?} without an open channel.",
target);
return
}
Some(sink) => sink
};
trace!(target: "sub-libp2p", "External API => Packet for {:?}", target);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
notifs_sink.send_legacy(message);
}
/// Returns the state of the peerset manager, for debugging purposes.
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
self.peerset.debug_info()
@@ -262,16 +262,10 @@ struct NotificationsSinkInner {
/// dedicated to the peer.
#[derive(Debug)]
enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::send_legacy`].
Legacy {
message: Vec<u8>,
},
/// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`].
Notification {
protocol_name: Cow<'static, str>,
encoded_fallback_message: Vec<u8>,
message: Vec<u8>,
},
@@ -280,26 +274,6 @@ enum NotificationsSinkMessage {
}
impl NotificationsSink {
/// Sends a message to the peer using the legacy substream.
///
/// If too many messages are already buffered, the message is silently discarded and the
/// connection to the peer will be closed shortly after.
///
/// This method will be removed in a future version.
pub fn send_legacy<'a>(&'a self, message: impl Into<Vec<u8>>) {
let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Legacy {
message: message.into()
});
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
// buffer, and therefore that `try_send` will succeed.
let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose);
debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
}
}
/// Sends a notification to the peer.
///
/// If too many messages are already buffered, the notification is silently discarded and the
@@ -312,13 +286,11 @@ impl NotificationsSink {
pub fn send_sync_notification<'a>(
&'a self,
protocol_name: Cow<'static, str>,
encoded_fallback_message: impl Into<Vec<u8>>,
message: impl Into<Vec<u8>>
) {
let mut lock = self.inner.sync_channel.lock();
let result = lock.try_send(NotificationsSinkMessage::Notification {
protocol_name: protocol_name,
encoded_fallback_message: encoded_fallback_message.into(),
protocol_name,
message: message.into()
});
@@ -364,12 +336,10 @@ impl<'a> Ready<'a> {
/// Returns an error if the substream has been closed.
pub fn send(
mut self,
encoded_fallback_message: impl Into<Vec<u8>>,
notification: impl Into<Vec<u8>>
) -> Result<(), ()> {
self.lock.start_send(NotificationsSinkMessage::Notification {
protocol_name: self.protocol_name,
encoded_fallback_message: encoded_fallback_message.into(),
message: notification.into(),
}).map_err(|_| ())
}
@@ -602,26 +572,38 @@ impl ProtocolsHandler for NotifsHandler {
};
match message {
NotificationsSinkMessage::Legacy { message } => {
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage {
message
});
}
NotificationsSinkMessage::Notification {
protocol_name,
encoded_fallback_message,
message
} => {
let mut found_any_with_name = false;
for (handler, _) in &mut self.out_handlers {
if *handler.protocol_name() == protocol_name && handler.is_open() {
handler.send_or_discard(message);
continue 'poll_notifs_sink;
if *handler.protocol_name() == protocol_name {
found_any_with_name = true;
if handler.is_open() {
handler.send_or_discard(message);
continue 'poll_notifs_sink;
}
}
}
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage {
message: encoded_fallback_message,
});
// This code can be reached via the following scenarios:
//
// - User tried to send a notification on a non-existing protocol. This
// most likely relates to https://github.com/paritytech/substrate/issues/6827
// - User tried to send a notification to a peer we're not or no longer
// connected to. This happens in a normal scenario due to the racy nature
// of connections and disconnections, and is benign.
//
// We print a warning in the former condition.
if !found_any_with_name {
log::warn!(
target: "sub-libp2p",
"Tried to send a notification on non-registered protocol: {:?}",
protocol_name
);
}
}
NotificationsSinkMessage::ForceClose => {
return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged));
@@ -204,12 +204,6 @@ pub enum LegacyProtoHandlerIn {
/// The node should stop using custom protocols.
Disable,
/// Sends a message through a custom protocol substream.
SendCustomMessage {
/// The message to send.
message: Vec<u8>,
},
}
/// Event that can be emitted by a `LegacyProtoHandler`.
@@ -495,17 +489,6 @@ impl LegacyProtoHandler {
ProtocolState::KillAsap => ProtocolState::KillAsap,
};
}
/// Sends a message to the remote.
fn send_message(&mut self, message: Vec<u8>) {
match self.state {
ProtocolState::Normal { ref mut substreams, .. } =>
substreams[0].send_message(message),
_ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \
with {:?}", self.remote_peer_id)
}
}
}
impl ProtocolsHandler for LegacyProtoHandler {
@@ -539,12 +522,9 @@ impl ProtocolsHandler for LegacyProtoHandler {
match message {
LegacyProtoHandlerIn::Disable => self.disable(),
LegacyProtoHandlerIn::Enable => self.enable(),
LegacyProtoHandlerIn::SendCustomMessage { message } =>
self.send_message(message),
}
}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr<io::Error>) {
let is_severe = match err {
ProtocolsHandlerUpgrErr::Upgrade(_) => true,
@@ -16,19 +16,16 @@
#![cfg(test)]
use futures::{prelude::*, ready};
use codec::{Encode, Decode};
use libp2p::core::connection::{ConnectionId, ListenerId};
use libp2p::core::ConnectedPoint;
use libp2p::swarm::{Swarm, ProtocolsHandler, IntoProtocolsHandler};
use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::{PeerId, Multiaddr, Transport};
use rand::seq::SliceRandom;
use std::{error, io, task::Context, task::Poll, time::Duration};
use std::collections::HashSet;
use crate::protocol::message::{generic::BlockResponse, Message};
use crate::protocol::generic_proto::{GenericProto, GenericProtoOut};
use sp_test_primitives::Block;
use futures::prelude::*;
use libp2p::{PeerId, Multiaddr, Transport};
use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint};
use libp2p::swarm::{
Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters,
NetworkBehaviour, NetworkBehaviourAction
};
use std::{error, io, task::Context, task::Poll, time::Duration};
/// Builds two nodes that have each other as bootstrap nodes.
/// This is to be used only for testing, and a panic will happen if something goes wrong.
@@ -216,137 +213,6 @@ impl NetworkBehaviour for CustomProtoWithAddr {
}
}
#[test]
fn two_nodes_transfer_lots_of_packets() {
// We spawn two nodes, then make the first one send lots of packets to the second one. The test
// ends when the second one has received all of them.
// This test consists in transferring this given number of packets. Considering that (by
// design) the connection gets closed if one of the remotes can't follow the pace, this number
// should not exceed the size of the buffer of pending notifications.
const NUM_PACKETS: u32 = 512;
let (mut service1, mut service2) = build_nodes();
let fut1 = future::poll_fn(move |cx| -> Poll<()> {
loop {
match ready!(service1.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => {
for n in 0 .. NUM_PACKETS {
service1.send_packet(
&peer_id,
Message::<Block>::BlockResponse(BlockResponse {
id: n as _,
blocks: Vec::new(),
}).encode()
);
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
});
let mut packet_counter = 0u32;
let fut2 = future::poll_fn(move |cx| {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
match Message::<Block>::decode(&mut &message[..]).unwrap() {
Message::<Block>::BlockResponse(BlockResponse { id: _, blocks }) => {
assert!(blocks.is_empty());
packet_counter += 1;
if packet_counter == NUM_PACKETS {
return Poll::Ready(())
}
},
_ => panic!(),
}
}
_ => panic!(),
}
}
});
futures::executor::block_on(async move {
future::select(fut1, fut2).await;
});
}
#[test]
fn basic_two_nodes_requests_in_parallel() {
let (mut service1, mut service2) = build_nodes();
// Generate random messages with or without a request id.
let mut to_send = {
let mut to_send = Vec::new();
let mut existing_ids = HashSet::new();
for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode.
let req_id = loop {
let req_id = rand::random::<u64>();
// ensure uniqueness - odds of randomly sampling collisions
// is unlikely, but possible to cause spurious test failures.
if existing_ids.insert(req_id) {
break req_id;
}
};
to_send.push(Message::<Block>::BlockResponse(
BlockResponse { id: req_id, blocks: Vec::new() }
));
}
to_send
};
// Clone `to_send` in `to_receive`. Below we will remove from `to_receive` the messages we
// receive, until the list is empty.
let mut to_receive = to_send.clone();
to_send.shuffle(&mut rand::thread_rng());
let fut1 = future::poll_fn(move |cx| -> Poll<()> {
loop {
match ready!(service1.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => {
for msg in to_send.drain(..) {
service1.send_packet(&peer_id, msg.encode());
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
});
let fut2 = future::poll_fn(move |cx| {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
let pos = to_receive.iter().position(|m| m.encode() == message).unwrap();
to_receive.remove(pos);
if to_receive.is_empty() {
return Poll::Ready(())
}
}
_ => panic!(),
}
}
});
futures::executor::block_on(async move {
future::select(fut1, fut2).await;
});
}
#[test]
fn reconnect_after_disconnect() {
// We connect two nodes together, then force a disconnect (through the API of the `Service`),
@@ -123,15 +123,6 @@ impl<TSubstream> RegisteredProtocolSubstream<TSubstream> {
self.is_closing = true;
self.send_queue.clear();
}
/// Sends a message to the substream.
pub fn send_message(&mut self, data: Vec<u8>) {
if self.is_closing {
return
}
self.send_queue.push_back(From::from(&data[..]));
}
}
/// Event produced by the `RegisteredProtocolSubstream`.