Split Substrate messages into multiple substreams (#1796)

* Split Substrate messages into multiple substreams

* Add back Clogged event
This commit is contained in:
Pierre Krieger
2019-02-25 11:55:45 +01:00
committed by Bastian Köcher
parent dab5ad913f
commit 733ce7d616
10 changed files with 1032 additions and 241 deletions
+5 -4
View File
@@ -1624,7 +1624,7 @@ dependencies = [
"libp2p-core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
"yamux 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"yamux 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -3938,6 +3938,7 @@ dependencies = [
name = "substrate-network-libp2p"
version = "0.1.0"
dependencies = [
"byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -4643,7 +4644,7 @@ name = "twox-hash"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -4950,7 +4951,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "yamux"
version = "0.1.6"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -5355,4 +5356,4 @@ dependencies = [
"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
"checksum xdg 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d089681aa106a86fade1b0128fb5daf07d5867a509ab036d99988dec80429a57"
"checksum yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992"
"checksum yamux 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "e25561b512df3c287cf52404cab0b07ea43d095cb96230e9e2cb635db72d75f0"
"checksum yamux 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "56626765982b12c2f4b59529e1d2ce0a7c25499865e6edf8b502dceb51b65fe2"
+1
View File
@@ -8,6 +8,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
byteorder = "1.3"
bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
@@ -22,7 +22,7 @@ use crate::parse_str_addr;
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p::core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId};
use libp2p::core::{protocols_handler::ProtocolsHandler, Endpoint, Multiaddr, PeerId};
use log::{debug, trace, warn};
use smallvec::SmallVec;
use std::{cmp, error, io, marker::PhantomData, path::Path, time::Duration, time::Instant};
@@ -458,13 +458,13 @@ where
trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (active)", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::EnableActive,
event: CustomProtosHandlerIn::Enable(Endpoint::Dialer),
});
} else {
trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (passive)", peer_id);
self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: CustomProtosHandlerIn::EnablePassive,
event: CustomProtosHandlerIn::Enable(Endpoint::Listener),
});
}
@@ -581,10 +581,15 @@ where
messages,
}));
}
CustomProtosHandlerOut::ProtocolError { protocol_id, error } => {
warn!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \
{:?}: {:?}", source, protocol_id, error);
self.ban_peer(source);
CustomProtosHandlerOut::ProtocolError { protocol_id, error, is_severe } => {
if is_severe {
warn!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \
{:?}: {:?}", source, protocol_id, error);
self.ban_peer(source);
} else {
debug!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \
{:?}: {:?}", source, protocol_id, error);
}
}
}
}
File diff suppressed because it is too large Load Diff
@@ -15,7 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
pub use self::behaviour::{CustomProtos, CustomProtosOut};
pub use self::upgrade::{CustomMessage, RegisteredProtocol, RegisteredProtocols};
pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol, RegisteredProtocols};
mod behaviour;
mod handler;
@@ -415,7 +415,12 @@ impl NetTopology {
continue
}
debug_assert!(!a.is_connected());
// It is possible that we are connected to this address, and that the dial failure
// concerns another peer.
if a.is_connected() {
continue
}
a.adjust_score(SCORE_DIFF_ON_FAILED_TO_CONNECT);
trace!(target: "sub-libp2p", "Back off for {} = {:?}", addr, a.next_back_off);
a.back_off_until = Instant::now() + a.next_back_off;
@@ -16,10 +16,10 @@
use crate::ProtocolId;
use bytes::Bytes;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::core::{Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::tokio_codec::Framed;
use log::warn;
use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter};
use std::{collections::VecDeque, io, iter, marker::PhantomData, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec::UviBytes;
@@ -84,6 +84,9 @@ impl<TMessage> Clone for RegisteredProtocol<TMessage> {
pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
/// If true, we are in the process of closing the sink.
is_closing: bool,
/// Whether the local node opened this substream (dialer), or we received this substream from
/// the remote (listener).
endpoint: Endpoint,
/// Buffer of packets to send.
send_queue: VecDeque<Vec<u8>>,
/// If true, we should call `poll_complete` on the inner sink.
@@ -97,6 +100,9 @@ pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
/// unless the buffer empties then fills itself again.
clogged_fuse: bool,
/// If true, then this substream uses the "/multi/" version of the protocol. This is a hint
/// that the handler can behave differently.
is_multiplex: bool,
/// Marker to pin the generic.
marker: PhantomData<TMessage>,
}
@@ -114,6 +120,18 @@ impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
self.protocol_version
}
/// Returns whether the local node opened this substream (dialer), or we received this
/// substream from the remote (listener).
pub fn endpoint(&self) -> Endpoint {
self.endpoint
}
/// Returns true if we negotiated the "multiplexed" version. This means that the handler can
/// open multiple substreams instead of just one.
pub fn is_multiplex(&self) -> bool {
self.is_multiplex
}
/// Starts a graceful shutdown process on this substream.
///
/// Note that "graceful" means that we sent a closing message. We don't wait for any
@@ -138,14 +156,39 @@ impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
/// Implemented on messages that can be sent or received on the network.
pub trait CustomMessage {
/// Turns a message into raw bytes.
/// Turns a message into the raw bytes to send over the network.
fn into_bytes(self) -> Vec<u8>;
/// Tries to part `bytes` into a message.
/// Tries to parse `bytes` received from the network into a message.
fn from_bytes(bytes: &[u8]) -> Result<Self, ()>
where Self: Sized;
/// Returns a unique ID that is used to match request and responses.
///
/// The networking layer employs multiplexing in order to have multiple parallel data streams.
/// Transmitting messages over the network uses two kinds of substreams:
///
/// - Undirectional substreams, where we send a single message then close the substream.
/// - Bidirectional substreams, where we send a message then wait for a response. Once the
/// response has arrived, we close the substream.
///
/// If `request_id()` returns `OneWay`, then this message will be sent or received over a
/// unidirectional substream. If instead it returns `Request` or `Response`, then we use the
/// value to match a request with its response.
fn request_id(&self) -> CustomMessageId;
}
/// This trait implementation exists mostly for testing convenience.
/// See the documentation of `CustomMessage::request_id`.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CustomMessageId {
OneWay,
Request(u64),
Response(u64),
}
// These trait implementations exist mostly for testing convenience. This should eventually be
// removed.
impl CustomMessage for Vec<u8> {
fn into_bytes(self) -> Vec<u8> {
self
@@ -154,6 +197,45 @@ impl CustomMessage for Vec<u8> {
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
Ok(bytes.to_vec())
}
fn request_id(&self) -> CustomMessageId {
CustomMessageId::OneWay
}
}
impl CustomMessage for (Option<u64>, Vec<u8>) {
fn into_bytes(self) -> Vec<u8> {
use byteorder::WriteBytesExt;
use std::io::Write;
let mut out = Vec::new();
out.write_u64::<byteorder::BigEndian>(self.0.unwrap_or(u64::max_value()))
.expect("Writing to a Vec can never fail");
out.write_all(&self.1).expect("Writing to a Vec can never fail");
out
}
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
use byteorder::ReadBytesExt;
use std::io::Read;
let mut rdr = std::io::Cursor::new(bytes);
let id = rdr.read_u64::<byteorder::BigEndian>().map_err(|_| ())?;
let mut out = Vec::new();
rdr.read_to_end(&mut out).map_err(|_| ())?;
let id = if id == u64::max_value() {
None
} else {
Some(id)
};
Ok((id, out))
}
fn request_id(&self) -> CustomMessageId {
if let Some(id) = self.0 {
CustomMessageId::Request(id)
} else {
CustomMessageId::OneWay
}
}
}
/// Event produced by the `RegisteredProtocolSubstream`.
@@ -176,11 +258,6 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// If we are closing, close as soon as the Sink is closed.
if self.is_closing {
return Ok(self.inner.close()?.map(|()| None))
}
// Flushing the local queue.
while let Some(packet) = self.send_queue.pop_front() {
match self.inner.start_send(packet)? {
@@ -192,6 +269,11 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
}
}
// If we are closing, close as soon as the Sink is closed.
if self.is_closing {
return Ok(self.inner.close()?.map(|()| None))
}
// Indicating that the remote is clogged if that's the case.
if self.send_queue.len() >= 2048 {
if !self.clogged_fuse {
@@ -227,13 +309,13 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
io::ErrorKind::InvalidData
})?;
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message))))
},
}
Async::Ready(None) =>
if !self.requires_poll_complete && self.send_queue.is_empty() {
Ok(Async::Ready(None))
} else {
Ok(Async::NotReady)
},
}
Async::NotReady => Ok(Async::NotReady),
}
}
@@ -246,14 +328,33 @@ impl<TMessage> UpgradeInfo for RegisteredProtocol<TMessage> {
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
// Report each version as an individual protocol.
self.supported_versions.iter().map(|&version| {
self.supported_versions.iter().flat_map(|&version| {
let num = version.to_string();
let mut name = self.base_name.clone();
name.extend_from_slice(num.as_bytes());
RegisteredProtocolName {
name,
// Note that `name1` is the multiplex version, as we priviledge it over the old one.
let mut name1 = self.base_name.clone();
name1.extend_from_slice(b"multi/");
name1.extend_from_slice(num.as_bytes());
let proto1 = RegisteredProtocolName {
name: name1,
version,
}
is_multiplex: true,
};
let mut name2 = self.base_name.clone();
name2.extend_from_slice(num.as_bytes());
let proto2 = RegisteredProtocolName {
name: name2,
version,
is_multiplex: false,
};
// Important note: we prioritize the backwards compatible mode for now.
// After some intensive testing has been done, we should switch to the new mode by
// default.
// Then finally we can remove the old mode after everyone has switched.
// See https://github.com/paritytech/substrate/issues/1692
iter::once(proto2).chain(iter::once(proto1))
}).collect::<Vec<_>>().into_iter()
}
}
@@ -265,6 +366,8 @@ pub struct RegisteredProtocolName {
name: Bytes,
/// Version number. Stored in string form in `name`, but duplicated here for easier retrieval.
version: u8,
/// If true, then this version is the one with the multiplexing.
is_multiplex: bool,
}
impl ProtocolName for RegisteredProtocolName {
@@ -289,12 +392,14 @@ where TSubstream: AsyncRead + AsyncWrite,
future::ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Listener,
send_queue: VecDeque::new(),
requires_poll_complete: false,
inner: framed.fuse(),
protocol_id: self.id,
protocol_version: info.version,
clogged_fuse: false,
is_multiplex: info.is_multiplex,
marker: PhantomData,
})
}
@@ -312,8 +417,20 @@ where TSubstream: AsyncRead + AsyncWrite,
socket: TSubstream,
info: Self::Info,
) -> Self::Future {
// Upgrades are symmetrical.
self.upgrade_inbound(socket, info)
let framed = Framed::new(socket, UviBytes::default());
future::ok(RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Dialer,
send_queue: VecDeque::new(),
requires_poll_complete: false,
inner: framed.fuse(),
protocol_id: self.id,
protocol_version: info.version,
clogged_fuse: false,
is_multiplex: info.is_multiplex,
marker: PhantomData,
})
}
}
@@ -326,11 +443,6 @@ impl<TMessage> RegisteredProtocols<TMessage> {
pub fn len(&self) -> usize {
self.0.len()
}
/// Returns true if the given protocol is in the list.
pub fn has_protocol(&self, protocol: ProtocolId) -> bool {
self.0.iter().any(|p| p.id == protocol)
}
}
impl<TMessage> Default for RegisteredProtocols<TMessage> {
+1 -1
View File
@@ -24,7 +24,7 @@ mod service_task;
mod traits;
mod transport;
pub use crate::custom_proto::{CustomMessage, RegisteredProtocol};
pub use crate::custom_proto::{CustomMessage, CustomMessageId, RegisteredProtocol};
pub use crate::error::{Error, ErrorKind, DisconnectReason};
pub use crate::secret::obtain_private_key;
pub use crate::service_task::{start_service, Service, ServiceEvent};
+71 -2
View File
@@ -15,6 +15,7 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use futures::{future, stream, prelude::*, try_ready};
use rand::seq::SliceRandom;
use std::{io, iter};
use substrate_network_libp2p::{CustomMessage, Protocol, ServiceEvent, build_multiaddr};
@@ -85,7 +86,10 @@ fn basic_two_nodes_connectivity() {
fn two_nodes_transfer_lots_of_packets() {
// We spawn two nodes, then make the first one send lots of packets to the second one. The test
// ends when the second one has received all of them.
const NUM_PACKETS: u32 = 20000;
// Note that if we go too high, we will reach the limit to the number of simultaneous
// substreams allowed by the multiplexer.
const NUM_PACKETS: u32 = 5000;
let (mut service1, mut service2) = {
let mut l = build_nodes::<Vec<u8>>(2).into_iter();
@@ -114,7 +118,6 @@ fn two_nodes_transfer_lots_of_packets() {
Some(ServiceEvent::OpenedCustomProtocol { .. }) => {},
Some(ServiceEvent::CustomMessage { message, .. }) => {
assert_eq!(message.len(), 1);
assert_eq!(u32::from(message[0]), packet_counter % 256);
packet_counter += 1;
if packet_counter == NUM_PACKETS {
return Ok(Async::Ready(()))
@@ -189,3 +192,69 @@ fn many_nodes_connectivity() {
tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap();
}
#[test]
fn basic_two_nodes_requests_in_parallel() {
let (mut service1, mut service2) = {
let mut l = build_nodes::<(Option<u64>, Vec<u8>)>(2).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
};
// Generate random messages with or without a request id.
let mut to_send = {
let mut to_send = Vec::new();
let mut next_id = 0;
for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode.
let id = if rand::random::<usize>() % 4 != 0 {
let i = next_id;
next_id += 1;
Some(i)
} else {
None
};
let msg = (id, (0..10).map(|_| rand::random::<u8>()).collect::<Vec<_>>());
to_send.push(msg);
}
to_send
};
// Clone `to_send` in `to_receive`. Below we will remove from `to_receive` the messages we
// receive, until the list is empty.
let mut to_receive = to_send.clone();
to_send.shuffle(&mut rand::thread_rng());
let fut1 = future::poll_fn(move || -> io::Result<_> {
loop {
match try_ready!(service1.poll()) {
Some(ServiceEvent::OpenedCustomProtocol { node_index, protocol, .. }) => {
for msg in to_send.drain(..) {
service1.send_custom_message(node_index, protocol, msg);
}
},
_ => panic!(),
}
}
});
let fut2 = future::poll_fn(move || -> io::Result<_> {
loop {
match try_ready!(service2.poll()) {
Some(ServiceEvent::OpenedCustomProtocol { .. }) => {},
Some(ServiceEvent::CustomMessage { message, .. }) => {
let pos = to_receive.iter().position(|m| *m == message).unwrap();
to_receive.remove(pos);
if to_receive.is_empty() {
return Ok(Async::Ready(()))
}
}
_ => panic!(),
}
}
});
let combined = fut1.select(fut2).map_err(|(err, _)| err);
tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap();
}
+21 -1
View File
@@ -129,7 +129,7 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use parity_codec::{Encode, Decode};
use network_libp2p::CustomMessage;
use network_libp2p::{CustomMessage, CustomMessageId};
use runtime_primitives::Justification;
use parity_codec_derive::{Encode, Decode};
use crate::config::Roles;
@@ -218,6 +218,26 @@ pub mod generic {
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
Decode::decode(&mut &bytes[..]).ok_or(())
}
fn request_id(&self) -> CustomMessageId {
match *self {
Message::Status(_) => CustomMessageId::OneWay,
Message::BlockRequest(ref req) => CustomMessageId::Request(req.id),
Message::BlockResponse(ref resp) => CustomMessageId::Response(resp.id),
Message::BlockAnnounce(_) => CustomMessageId::OneWay,
Message::Transactions(_) => CustomMessageId::OneWay,
Message::Consensus(_) => CustomMessageId::OneWay,
Message::RemoteCallRequest(ref req) => CustomMessageId::Request(req.id),
Message::RemoteCallResponse(ref resp) => CustomMessageId::Response(resp.id),
Message::RemoteReadRequest(ref req) => CustomMessageId::Request(req.id),
Message::RemoteReadResponse(ref resp) => CustomMessageId::Response(resp.id),
Message::RemoteHeaderRequest(ref req) => CustomMessageId::Request(req.id),
Message::RemoteHeaderResponse(ref resp) => CustomMessageId::Response(resp.id),
Message::RemoteChangesRequest(ref req) => CustomMessageId::Request(req.id),
Message::RemoteChangesResponse(ref resp) => CustomMessageId::Response(resp.id),
Message::ChainSpecific(_) => CustomMessageId::OneWay,
}
}
}
/// Status sent on connection.