Send Status message on all newly-opened legacy substreams (#6593)

* Send Status message on all newly-opened legacy substreams

* Fix tests
This commit is contained in:
Pierre Krieger
2020-07-08 13:44:51 +02:00
committed by GitHub
parent 6eb2eb81c5
commit 8ca05879e8
5 changed files with 119 additions and 114 deletions
@@ -30,12 +30,13 @@ use libp2p::swarm::{
PollParameters
};
use log::{debug, error, trace, warn};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use std::task::{Context, Poll};
use std::{borrow::Cow, cmp, collections::{hash_map::Entry, VecDeque}};
use std::{error, mem, pin::Pin, str, time::Duration};
use std::{error, mem, pin::Pin, str, sync::Arc, time::Duration};
use wasm_timer::Instant;
/// Network behaviour that handles opening substreams for custom protocols with other peers.
@@ -118,7 +119,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, [u8]>, Vec<u8>)>,
notif_protocols: Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>,
/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
@@ -220,20 +221,6 @@ enum PeerState {
}
impl PeerState {
/// True if there exists any established connection to the peer.
fn is_connected(&self) -> bool {
match self {
PeerState::Disabled { .. } |
PeerState::DisabledPendingEnable { .. } |
PeerState::Enabled { .. } |
PeerState::PendingRequest { .. } |
PeerState::Requested |
PeerState::Incoming { .. } => true,
PeerState::Poisoned |
PeerState::Banned { .. } => false,
}
}
/// True if there exists an established connection to the peer
/// that is open for custom protocol traffic.
fn is_open(&self) -> bool {
@@ -343,10 +330,12 @@ impl GenericProto {
local_peer_id: PeerId,
protocol: impl Into<ProtocolId>,
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
queue_size_report: Option<HistogramVec>,
) -> Self {
let legacy_protocol = RegisteredProtocol::new(protocol, versions);
let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);
GenericProto {
local_peer_id,
@@ -372,7 +361,7 @@ impl GenericProto {
protocol_name: impl Into<Cow<'static, [u8]>>,
handshake_msg: impl Into<Vec<u8>>
) {
self.notif_protocols.push((protocol_name.into(), handshake_msg.into()));
self.notif_protocols.push((protocol_name.into(), Arc::new(RwLock::new(handshake_msg.into()))));
}
/// Modifies the handshake of the given notifications protocol.
@@ -383,24 +372,17 @@ impl GenericProto {
protocol_name: &[u8],
handshake_message: impl Into<Vec<u8>>
) {
let handshake_message = handshake_message.into();
if let Some(protocol) = self.notif_protocols.iter_mut().find(|(name, _)| name == &protocol_name) {
protocol.1 = handshake_message.clone();
} else {
return;
*protocol.1.write() = handshake_message.into();
}
}
// Send an event to all the peers we're connected to, updating the handshake message.
for (peer_id, _) in self.peers.iter().filter(|(_, state)| state.is_connected()) {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.clone(),
handler: NotifyHandler::All,
event: NotifsHandlerIn::UpdateHandshake {
protocol_name: Cow::Owned(protocol_name.to_owned()),
handshake_message: handshake_message.clone(),
},
});
}
/// Modifies the handshake of the legacy protocol.
pub fn set_legacy_handshake_message(
&mut self,
handshake_message: impl Into<Vec<u8>>
) {
*self.legacy_protocol.handshake_message().write() = handshake_message.into();
}
/// Returns the number of discovered nodes that we keep in memory.
@@ -64,8 +64,9 @@ use libp2p::swarm::{
NegotiatedSubstream,
};
use log::{debug, error};
use parking_lot::RwLock;
use prometheus_endpoint::HistogramVec;
use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}};
/// Implements the `IntoProtocolsHandler` trait of libp2p.
///
@@ -77,10 +78,10 @@ use std::{borrow::Cow, error, io, str, task::{Context, Poll}};
pub struct NotifsHandlerProto {
/// Prototypes for handlers for inbound substreams, and the message we respond with in the
/// handshake.
in_handlers: Vec<(NotifsInHandlerProto, Vec<u8>)>,
in_handlers: Vec<(NotifsInHandlerProto, Arc<RwLock<Vec<u8>>>)>,
/// Prototypes for handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandlerProto, Vec<u8>)>,
out_handlers: Vec<(NotifsOutHandlerProto, Arc<RwLock<Vec<u8>>>)>,
/// Prototype for handler for backwards-compatibility.
legacy: LegacyProtoHandlerProto,
@@ -91,10 +92,10 @@ pub struct NotifsHandlerProto {
/// See the documentation at the module level for more information.
pub struct NotifsHandler {
/// Handlers for inbound substreams, and the message we respond with in the handshake.
in_handlers: Vec<(NotifsInHandler, Vec<u8>)>,
in_handlers: Vec<(NotifsInHandler, Arc<RwLock<Vec<u8>>>)>,
/// Handlers for outbound substreams, and the initial handshake message we send.
out_handlers: Vec<(NotifsOutHandler, Vec<u8>)>,
out_handlers: Vec<(NotifsOutHandler, Arc<RwLock<Vec<u8>>>)>,
/// Handler for backwards-compatibility.
legacy: LegacyProtoHandler,
@@ -161,18 +162,6 @@ pub enum NotifsHandlerIn {
message: Vec<u8>,
},
/// Modifies the handshake message of a notifications protocol.
UpdateHandshake {
/// Name of the protocol for the message.
///
/// Must match one of the registered protocols.
protocol_name: Cow<'static, [u8]>,
/// The new handshake message to send if we open a substream or if the remote opens a
/// substream towards us.
handshake_message: Vec<u8>,
},
/// Sends a notifications message.
SendNotification {
/// Name of the protocol for the message.
@@ -253,7 +242,7 @@ impl NotifsHandlerProto {
/// messages queue. If passed, it must have one label for the protocol name.
pub fn new(
legacy: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, [u8]>, Vec<u8>)>>,
list: impl Into<Vec<(Cow<'static, [u8]>, Arc<RwLock<Vec<u8>>>)>>,
queue_size_report: Option<HistogramVec>
) -> Self {
let list = list.into();
@@ -346,12 +335,17 @@ impl ProtocolsHandler for NotifsHandler {
self.enabled = EnabledState::Enabled;
self.legacy.inject_event(LegacyProtoHandlerIn::Enable);
for (handler, initial_message) in &mut self.out_handlers {
// We create `initial_message` on a separate line to be sure that the lock
// is released as soon as possible.
let initial_message = initial_message.read().clone();
handler.inject_event(NotifsOutHandlerIn::Enable {
initial_message: initial_message.clone(),
initial_message,
});
}
for num in self.pending_in.drain(..) {
let handshake_message = self.in_handlers[num].1.clone();
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = self.in_handlers[num].1.read().clone();
self.in_handlers[num].0
.inject_event(NotifsInHandlerIn::Accept(handshake_message));
}
@@ -375,18 +369,6 @@ impl ProtocolsHandler for NotifsHandler {
},
NotifsHandlerIn::SendLegacy { message } =>
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
NotifsHandlerIn::UpdateHandshake { protocol_name, handshake_message } => {
for (handler, current_handshake) in &mut self.in_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
for (handler, current_handshake) in &mut self.out_handlers {
if handler.protocol_name() == &*protocol_name {
*current_handshake = handshake_message.clone();
}
}
}
NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => {
for (handler, _) in &mut self.out_handlers {
if handler.protocol_name() != &protocol_name[..] {
@@ -524,8 +506,12 @@ impl ProtocolsHandler for NotifsHandler {
ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) =>
match self.enabled {
EnabledState::Initial => self.pending_in.push(handler_num),
EnabledState::Enabled =>
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.clone())),
EnabledState::Enabled => {
// We create `handshake_message` on a separate line to be sure
// that the lock is released as soon as possible.
let handshake_message = handshake_message.read().clone();
handler.inject_event(NotifsInHandlerIn::Accept(handshake_message))
},
EnabledState::Disabled =>
handler.inject_event(NotifsInHandlerIn::Refuse),
},
@@ -83,7 +83,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
});
let behaviour = CustomProtoWithAddr {
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], peerset, None),
inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None),
addrs: addrs
.iter()
.enumerate()
@@ -241,6 +241,8 @@ fn two_nodes_transfer_lots_of_packets() {
);
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
@@ -251,6 +253,8 @@ fn two_nodes_transfer_lots_of_packets() {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
match Message::<Block>::decode(&mut &message[..]).unwrap() {
Message::<Block>::BlockResponse(BlockResponse { id: _, blocks }) => {
@@ -312,6 +316,8 @@ fn basic_two_nodes_requests_in_parallel() {
service1.send_packet(&peer_id, msg.encode());
}
},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
_ => panic!(),
}
}
@@ -321,6 +327,8 @@ fn basic_two_nodes_requests_in_parallel() {
loop {
match ready!(service2.poll_next_unpin(cx)) {
Some(GenericProtoOut::CustomProtocolOpen { .. }) => {},
// An empty handshake is being sent after opening.
Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {},
Some(GenericProtoOut::LegacyMessage { message, .. }) => {
let pos = to_receive.iter().position(|m| m.encode() == message).unwrap();
to_receive.remove(pos);
@@ -21,7 +21,8 @@ use bytes::BytesMut;
use futures::prelude::*;
use futures_codec::Framed;
use libp2p::core::{Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use std::{collections::VecDeque, io, pin::Pin, vec::IntoIter as VecIntoIter};
use parking_lot::RwLock;
use std::{collections::VecDeque, io, pin::Pin, sync::Arc, vec::IntoIter as VecIntoIter};
use std::task::{Context, Poll};
use unsigned_varint::codec::UviBytes;
@@ -38,12 +39,13 @@ pub struct RegisteredProtocol {
/// List of protocol versions that we support.
/// Ordered in descending order so that the best comes first.
supported_versions: Vec<u8>,
/// Handshake to send after the substream is open.
handshake_message: Arc<RwLock<Vec<u8>>>,
}
impl RegisteredProtocol {
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
/// passed inside the `RegisteredProtocolOutput`.
pub fn new(protocol: impl Into<ProtocolId>, versions: &[u8])
/// Creates a new `RegisteredProtocol`.
pub fn new(protocol: impl Into<ProtocolId>, versions: &[u8], handshake_message: Arc<RwLock<Vec<u8>>>)
-> Self {
let protocol = protocol.into();
let mut base_name = b"/substrate/".to_vec();
@@ -58,8 +60,14 @@ impl RegisteredProtocol {
tmp.sort_unstable_by(|a, b| b.cmp(&a));
tmp
},
handshake_message,
}
}
/// Returns the `Arc` to the handshake message that was passed at initialization.
pub fn handshake_message(&self) -> &Arc<RwLock<Vec<u8>>> {
&self.handshake_message
}
}
impl Clone for RegisteredProtocol {
@@ -68,6 +76,7 @@ impl Clone for RegisteredProtocol {
id: self.id.clone(),
base_name: self.base_name.clone(),
supported_versions: self.supported_versions.clone(),
handshake_message: self.handshake_message.clone(),
}
}
}
@@ -244,10 +253,10 @@ impl ProtocolName for RegisteredProtocolName {
}
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite + Unpin,
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = RegisteredProtocolSubstream<TSubstream>;
type Future = future::Ready<Result<Self::Output, io::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, io::Error>> + Send>>;
type Error = io::Error;
fn upgrade_inbound(
@@ -255,26 +264,31 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
let framed = {
let mut codec = UviBytes::default();
codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets.
Framed::new(socket, codec)
};
Box::pin(async move {
let mut framed = {
let mut codec = UviBytes::default();
codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets.
Framed::new(socket, codec)
};
future::ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Listener,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
let handshake = BytesMut::from(&self.handshake_message.read()[..]);
framed.send(handshake).await?;
Ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Listener,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
})
})
}
}
impl<TSubstream> OutboundUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite + Unpin,
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = <Self as InboundUpgrade<TSubstream>>::Output;
type Future = <Self as InboundUpgrade<TSubstream>>::Future;
@@ -285,16 +299,25 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
let framed = Framed::new(socket, UviBytes::default());
Box::pin(async move {
let mut framed = {
let mut codec = UviBytes::default();
codec.set_max_len(16 * 1024 * 1024); // 16 MiB hard limit for packets.
Framed::new(socket, codec)
};
future::ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Dialer,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
let handshake = BytesMut::from(&self.handshake_message.read()[..]);
framed.send(handshake).await?;
Ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Dialer,
send_queue: VecDeque::new(),
requires_poll_flush: false,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
})
})
}
}