diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs
index beac1dec25..7c0ec61344 100644
--- a/substrate/core/network-libp2p/src/custom_proto/handler.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
-use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol};
+use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream};
use futures::prelude::*;
use libp2p::core::{
@@ -25,8 +25,8 @@ use libp2p::core::{
protocols_handler::SubstreamProtocol,
upgrade::{InboundUpgrade, OutboundUpgrade}
};
-use log::{debug, error, warn};
-use smallvec::{smallvec, SmallVec};
+use log::{debug, error};
+use smallvec::SmallVec;
use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
@@ -69,26 +69,17 @@ use void::Void;
///
/// ## How it works
///
-/// For backwards compatibility reasons, the behaviour of the handler is quite complicated. After
-/// enough nodes have upgraded, it should be simplified by using helpers provided by libp2p.
-///
/// When the handler is created, it is initially in the `Init` state and waits for either a
/// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to
/// toggle the handler between the disabled and enabled states.
///
/// When the handler is enabled for the first time, if it is the dialer of the connection, it tries
-/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx` or a
-/// protocol named `/substrate/multi/xxx`. If it is the former, then we are in
-/// "backwards-compatibility mode". If it is the latter, we are in normal operation mode.
+/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx`, where
+/// `xxx` is chosen by the user.
///
-/// In "backwards-compatibility mode", we have one unique substream where bidirectional
-/// communications happen. If the remote closes the substream, we consider that we are now
-/// disconnected. Re-enabling is performed by re-opening the substream.
-///
-/// In normal operation mode, each request gets sent over a different substream where the response
-/// is then sent back. If the remote refuses one of our substream open request, or if an error
-/// happens on one substream, we consider that we are disconnected. Re-enabling is performed by
-/// opening an outbound substream.
+/// Then, we have one unique substream where bidirectional communications happen. If the remote
+/// closes the substream, we consider that we are now disconnected. Re-enabling is performed by
+/// re-opening the substream (even if we are not the dialer of the connection).
///
pub struct CustomProtoHandlerProto {
/// Configuration for the protocol upgrade to negotiate.
@@ -159,7 +150,6 @@ pub struct CustomProtoHandler {
/// `Clock` instance that uses the current execution context's source of time.
clock: Clock,
-
}
/// State of the handler.
@@ -181,17 +171,13 @@ enum ProtocolState {
/// Backwards-compatible mode. Contains the unique substream that is open.
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
- BackCompat {
+ Normal {
/// The unique substream where bidirectional communications happen.
substream: RegisteredProtocolSubstream,
/// Contains substreams which are being shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>,
},
- /// Normal functionning. Contains the substreams that are open.
- /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
- Normal(PerProtocolNormalState),
-
/// We are disabled. Contains substreams that are being closed.
/// If we are in this state, either we have sent a `CustomProtocolClosed` message to the
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
@@ -212,128 +198,6 @@ enum ProtocolState {
Poisoned,
}
-/// Normal functionning mode for a protocol.
-struct PerProtocolNormalState {
- /// Optional substream that we opened.
- outgoing_substream: Option>,
-
- /// Substreams that have been opened by the remote. We are waiting for a packet from it.
- incoming_substreams: SmallVec<[RegisteredProtocolSubstream; 4]>,
-
- /// For each request that has been sent to the remote, contains the substream where we
- /// expect a response.
- pending_response: SmallVec<[(u64, RegisteredProtocolSubstream); 4]>,
-
- /// For each request received by the remote, contains the substream where to send back our
- /// response. Once a response has been sent, the substream closes.
- pending_send_back: SmallVec<[(u64, RegisteredProtocolSubstream); 4]>,
-
- /// List of messages waiting for a substream to open in order to be sent.
- pending_messages: SmallVec<[TMessage; 6]>,
-
- /// Contains substreams which are being shut down.
- shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>,
-}
-
-impl PerProtocolNormalState
-where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
- /// Polls for things that are new. Same API constraints as `Future::poll()`.
- /// Optionally returns the event to produce.
- /// You must pass the `protocol_id` as we need have to inject it in the returned event.
- /// API note: Ideally we wouldn't need to be passed a `ProtocolId`, and we would return a
- /// different enum that doesn't contain any `protocol_id`, and the caller would inject
- /// the ID itself, but that's a ton of code for not much gain.
- fn poll(&mut self) -> Option> {
- for n in (0..self.pending_response.len()).rev() {
- let (request_id, mut substream) = self.pending_response.swap_remove(n);
- match substream.poll() {
- Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
- if message.request_id() == CustomMessageId::Response(request_id) {
- let event = CustomProtoHandlerOut::CustomMessage {
- message
- };
- self.shutdown.push(substream);
- return Some(event);
- } else {
- self.shutdown.push(substream);
- let event = CustomProtoHandlerOut::ProtocolError {
- is_severe: true,
- error: format!("Request ID doesn't match substream: expected {:?}, \
- got {:?}", request_id, message.request_id()).into(),
- };
- return Some(event);
- }
- },
- Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) =>
- unreachable!("Cannot receive Clogged message with new protocol version; QED"),
- Ok(Async::NotReady) =>
- self.pending_response.push((request_id, substream)),
- Ok(Async::Ready(None)) => {
- self.shutdown.push(substream);
- let event = CustomProtoHandlerOut::ProtocolError {
- is_severe: false,
- error: format!("Request ID {:?} didn't receive an answer", request_id).into(),
- };
- return Some(event);
- }
- Err(err) => {
- self.shutdown.push(substream);
- let event = CustomProtoHandlerOut::ProtocolError {
- is_severe: false,
- error: format!("Error while waiting for an answer for {:?}: {}",
- request_id, err).into(),
- };
- return Some(event);
- }
- }
- }
-
- for n in (0..self.incoming_substreams.len()).rev() {
- let mut substream = self.incoming_substreams.swap_remove(n);
- match substream.poll() {
- Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
- return match message.request_id() {
- CustomMessageId::Request(id) => {
- self.pending_send_back.push((id, substream));
- Some(CustomProtoHandlerOut::CustomMessage {
- message
- })
- }
- CustomMessageId::OneWay => {
- self.shutdown.push(substream);
- Some(CustomProtoHandlerOut::CustomMessage {
- message
- })
- }
- _ => {
- self.shutdown.push(substream);
- Some(CustomProtoHandlerOut::ProtocolError {
- is_severe: true,
- error: format!("Received response in new substream").into(),
- })
- }
- }
- },
- Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) =>
- unreachable!("Cannot receive Clogged message with new protocol version; QED"),
- Ok(Async::NotReady) =>
- self.incoming_substreams.push(substream),
- Ok(Async::Ready(None)) => {}
- Err(err) => {
- self.shutdown.push(substream);
- return Some(CustomProtoHandlerOut::ProtocolError {
- is_severe: false,
- error: format!("Error in incoming substream: {}", err).into(),
- });
- }
- }
- }
-
- shutdown_list(&mut self.shutdown);
- None
- }
-}
-
/// Event that can be received by a `CustomProtoHandler`.
#[derive(Debug)]
pub enum CustomProtoHandlerIn {
@@ -414,26 +278,12 @@ where
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
}
- } else if incoming.iter().any(|s| s.is_multiplex()) {
- let event = CustomProtoHandlerOut::CustomProtocolOpen {
- version: incoming[0].protocol_version()
- };
- self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
- ProtocolState::Normal(PerProtocolNormalState {
- outgoing_substream: None,
- incoming_substreams: incoming.into_iter().collect(),
- pending_response: SmallVec::new(),
- pending_send_back: SmallVec::new(),
- pending_messages: SmallVec::new(),
- shutdown: SmallVec::new(),
- })
-
} else {
let event = CustomProtoHandlerOut::CustomProtocolOpen {
version: incoming[0].protocol_version()
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
- ProtocolState::BackCompat {
+ ProtocolState::Normal {
substream: incoming.into_iter().next()
.expect("We have a check above that incoming isn't empty; QED"),
shutdown: SmallVec::new()
@@ -442,7 +292,6 @@ where
}
st @ ProtocolState::Opening { .. } => st,
- st @ ProtocolState::BackCompat { .. } => st,
st @ ProtocolState::Normal { .. } => st,
ProtocolState::Disabled { shutdown, .. } => {
ProtocolState::Disabled { shutdown, reenable: true }
@@ -470,7 +319,7 @@ where
ProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false }
}
- ProtocolState::BackCompat { mut substream, mut shutdown } => {
+ ProtocolState::Normal { mut substream, mut shutdown } => {
substream.shutdown();
shutdown.push(substream);
let event = CustomProtoHandlerOut::CustomProtocolClosed {
@@ -483,23 +332,6 @@ where
}
}
- ProtocolState::Normal(state) => {
- let mut out: SmallVec<[_; 6]> = SmallVec::new();
- out.extend(state.outgoing_substream.into_iter());
- out.extend(state.incoming_substreams.into_iter());
- out.extend(state.pending_response.into_iter().map(|(_, s)| s));
- out.extend(state.pending_send_back.into_iter().map(|(_, s)| s));
- for s in &mut out {
- s.shutdown();
- }
- out.extend(state.shutdown.into_iter());
- let event = CustomProtoHandlerOut::CustomProtocolClosed {
- result: Ok(())
- };
- self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
- ProtocolState::Disabled { shutdown: out, reenable: false }
- }
-
ProtocolState::Disabled { shutdown, .. } =>
ProtocolState::Disabled { shutdown, reenable: false },
};
@@ -557,25 +389,25 @@ where
}
}
- ProtocolState::BackCompat { mut substream, shutdown } => {
+ ProtocolState::Normal { mut substream, shutdown } => {
match substream.poll() {
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
let event = CustomProtoHandlerOut::CustomMessage {
message
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
- ProtocolState::BackCompat { substream, shutdown }
+ ProtocolState::Normal { substream, shutdown }
},
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => {
let event = CustomProtoHandlerOut::Clogged {
messages,
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
- ProtocolState::BackCompat { substream, shutdown }
+ ProtocolState::Normal { substream, shutdown }
}
Ok(Async::NotReady) => {
return_value = None;
- ProtocolState::BackCompat { substream, shutdown }
+ ProtocolState::Normal { substream, shutdown }
}
Ok(Async::Ready(None)) => {
let event = CustomProtoHandlerOut::CustomProtocolClosed {
@@ -600,16 +432,6 @@ where
}
}
- ProtocolState::Normal(mut norm_state) => {
- if let Some(event) = norm_state.poll() {
- return_value = Some(ProtocolsHandlerEvent::Custom(event));
- } else {
- return_value = None;
- }
-
- ProtocolState::Normal(norm_state)
- }
-
ProtocolState::Disabled { mut shutdown, reenable } => {
shutdown_list(&mut shutdown);
// If `reenable` is `true`, that means we should open the substreams system again
@@ -658,65 +480,18 @@ where
version: substream.protocol_version()
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
-
- match (substream.endpoint(), substream.is_multiplex()) {
- (Endpoint::Dialer, true) => {
- ProtocolState::Normal(PerProtocolNormalState {
- outgoing_substream: Some(substream),
- incoming_substreams: SmallVec::new(),
- pending_response: SmallVec::new(),
- pending_send_back: SmallVec::new(),
- pending_messages: SmallVec::new(),
- shutdown: SmallVec::new(),
- })
- },
- (Endpoint::Listener, true) => {
- ProtocolState::Normal(PerProtocolNormalState {
- outgoing_substream: None,
- incoming_substreams: smallvec![substream],
- pending_response: SmallVec::new(),
- pending_send_back: SmallVec::new(),
- pending_messages: SmallVec::new(),
- shutdown: SmallVec::new(),
- })
- },
- (_, false) => {
- ProtocolState::BackCompat {
- substream,
- shutdown: SmallVec::new()
- }
- },
+ ProtocolState::Normal {
+ substream,
+ shutdown: SmallVec::new()
}
}
- ProtocolState::BackCompat { substream: existing, mut shutdown } => {
+ ProtocolState::Normal { substream: existing, mut shutdown } => {
debug!(target: "sub-libp2p", "Received extra substream after having already one \
open in backwards-compatibility mode with {:?}", self.remote_peer_id);
substream.shutdown();
shutdown.push(substream);
- ProtocolState::BackCompat { substream: existing, shutdown }
- }
-
- ProtocolState::Normal(mut state) => {
- if substream.endpoint() == Endpoint::Listener {
- state.incoming_substreams.push(substream);
- } else if !state.pending_messages.is_empty() {
- let message = state.pending_messages.remove(0);
- let request_id = message.request_id();
- substream.send_message(message);
- if let CustomMessageId::Request(request_id) = request_id {
- state.pending_response.push((request_id, substream));
- } else {
- state.shutdown.push(substream);
- }
- } else {
- debug!(target: "sub-libp2p", "Opened spurious outbound substream with {:?}",
- self.remote_peer_id);
- substream.shutdown();
- state.shutdown.push(substream);
- }
-
- ProtocolState::Normal(state)
+ ProtocolState::Normal { substream: existing, shutdown }
}
ProtocolState::Disabled { mut shutdown, .. } => {
@@ -730,54 +505,9 @@ where
/// Sends a message to the remote.
fn send_message(&mut self, message: TMessage) {
match self.state {
- ProtocolState::BackCompat { ref mut substream, .. } =>
+ ProtocolState::Normal { ref mut substream, .. } =>
substream.send_message(message),
- ProtocolState::Normal(ref mut state) => {
- if let CustomMessageId::Request(request_id) = message.request_id() {
- if let Some(mut outgoing_substream) = state.outgoing_substream.take() {
- outgoing_substream.send_message(message);
- state.pending_response.push((request_id, outgoing_substream));
- } else {
- if state.pending_messages.len() >= 2048 {
- let event = CustomProtoHandlerOut::Clogged {
- messages: Vec::new(),
- };
- self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
- }
- state.pending_messages.push(message);
- self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
- protocol: SubstreamProtocol::new(self.protocol.clone()),
- info: ()
- });
- }
- } else if let CustomMessageId::Response(request_id) = message.request_id() {
- if let Some(pos) = state.pending_send_back.iter().position(|(id, _)| *id == request_id) {
- let (_, mut substream) = state.pending_send_back.remove(pos);
- substream.send_message(message);
- state.shutdown.push(substream);
- } else {
- warn!(target: "sub-libp2p", "Libp2p layer received response to a \
- non-existing request ID {:?} with {:?}", request_id, self.remote_peer_id);
- }
- } else if let Some(mut outgoing_substream) = state.outgoing_substream.take() {
- outgoing_substream.send_message(message);
- state.shutdown.push(outgoing_substream);
- } else {
- if state.pending_messages.len() >= 2048 {
- let event = CustomProtoHandlerOut::Clogged {
- messages: Vec::new(),
- };
- self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
- }
- state.pending_messages.push(message);
- self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
- protocol: SubstreamProtocol::new(self.protocol.clone()),
- info: ()
- });
- }
- }
-
_ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \
with {:?}", self.remote_peer_id)
}
@@ -844,8 +574,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
match self.state {
ProtocolState::Init { .. } | ProtocolState::Opening { .. } => {}
- ProtocolState::BackCompat { .. } | ProtocolState::Normal { .. } =>
- keep_forever = true,
+ ProtocolState::Normal { .. } => keep_forever = true,
ProtocolState::Disabled { .. } | ProtocolState::Poisoned => return KeepAlive::No,
}
diff --git a/substrate/core/network-libp2p/src/custom_proto/mod.rs b/substrate/core/network-libp2p/src/custom_proto/mod.rs
index cf2bf57153..261f710d8d 100644
--- a/substrate/core/network-libp2p/src/custom_proto/mod.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/mod.rs
@@ -15,7 +15,7 @@
// along with Substrate. If not, see .
pub use self::behaviour::{CustomProto, CustomProtoOut};
-pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol};
+pub use self::upgrade::{CustomMessage, RegisteredProtocol};
mod behaviour;
mod handler;
diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
index 00c3fc999d..bc61ff74e8 100644
--- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
+++ b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs
@@ -19,7 +19,7 @@ use bytes::Bytes;
use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
use libp2p::tokio_codec::Framed;
use log::warn;
-use std::{collections::VecDeque, io, iter, marker::PhantomData, vec::IntoIter as VecIntoIter};
+use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter};
use futures::{prelude::*, future, stream};
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec::UviBytes;
@@ -100,9 +100,6 @@ pub struct RegisteredProtocolSubstream {
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
/// unless the buffer empties then fills itself again.
clogged_fuse: bool,
- /// If true, then this substream uses the "/multi/" version of the protocol. This is a hint
- /// that the handler can behave differently.
- is_multiplex: bool,
/// Marker to pin the generic.
marker: PhantomData,
}
@@ -126,12 +123,6 @@ impl RegisteredProtocolSubstream {
self.endpoint
}
- /// Returns true if we negotiated the "multiplexed" version. This means that the handler can
- /// open multiple substreams instead of just one.
- pub fn is_multiplex(&self) -> bool {
- self.is_multiplex
- }
-
/// Starts a graceful shutdown process on this substream.
///
/// Note that "graceful" means that we sent a closing message. We don't wait for any
@@ -162,31 +153,9 @@ pub trait CustomMessage {
/// Tries to parse `bytes` received from the network into a message.
fn from_bytes(bytes: &[u8]) -> Result
where Self: Sized;
-
- /// Returns a unique ID that is used to match request and responses.
- ///
- /// The networking layer employs multiplexing in order to have multiple parallel data streams.
- /// Transmitting messages over the network uses two kinds of substreams:
- ///
- /// - Undirectional substreams, where we send a single message then close the substream.
- /// - Bidirectional substreams, where we send a message then wait for a response. Once the
- /// response has arrived, we close the substream.
- ///
- /// If `request_id()` returns `OneWay`, then this message will be sent or received over a
- /// unidirectional substream. If instead it returns `Request` or `Response`, then we use the
- /// value to match a request with its response.
- fn request_id(&self) -> CustomMessageId;
}
-/// See the documentation of `CustomMessage::request_id`.
-#[derive(Debug, Copy, Clone, PartialEq, Eq)]
-pub enum CustomMessageId {
- OneWay,
- Request(u64),
- Response(u64),
-}
-
-// These trait implementations exist mostly for testing convenience. This should eventually be
+// This trait implementation exist mostly for testing convenience. This should eventually be
// removed.
impl CustomMessage for Vec {
@@ -197,45 +166,6 @@ impl CustomMessage for Vec {
fn from_bytes(bytes: &[u8]) -> Result {
Ok(bytes.to_vec())
}
-
- fn request_id(&self) -> CustomMessageId {
- CustomMessageId::OneWay
- }
-}
-
-impl CustomMessage for (Option, Vec) {
- fn into_bytes(self) -> Vec {
- use byteorder::WriteBytesExt;
- use std::io::Write;
- let mut out = Vec::new();
- out.write_u64::(self.0.unwrap_or(u64::max_value()))
- .expect("Writing to a Vec can never fail");
- out.write_all(&self.1).expect("Writing to a Vec can never fail");
- out
- }
-
- fn from_bytes(bytes: &[u8]) -> Result {
- use byteorder::ReadBytesExt;
- use std::io::Read;
- let mut rdr = std::io::Cursor::new(bytes);
- let id = rdr.read_u64::().map_err(|_| ())?;
- let mut out = Vec::new();
- rdr.read_to_end(&mut out).map_err(|_| ())?;
- let id = if id == u64::max_value() {
- None
- } else {
- Some(id)
- };
- Ok((id, out))
- }
-
- fn request_id(&self) -> CustomMessageId {
- if let Some(id) = self.0 {
- CustomMessageId::Request(id)
- } else {
- CustomMessageId::OneWay
- }
- }
}
/// Event produced by the `RegisteredProtocolSubstream`.
@@ -328,33 +258,15 @@ impl UpgradeInfo for RegisteredProtocol {
#[inline]
fn protocol_info(&self) -> Self::InfoIter {
// Report each version as an individual protocol.
- self.supported_versions.iter().flat_map(|&version| {
+ self.supported_versions.iter().map(|&version| {
let num = version.to_string();
- // Note that `name1` is the multiplex version, as we priviledge it over the old one.
- let mut name1 = self.base_name.clone();
- name1.extend_from_slice(b"multi/");
- name1.extend_from_slice(num.as_bytes());
- let proto1 = RegisteredProtocolName {
- name: name1,
+ let mut name = self.base_name.clone();
+ name.extend_from_slice(num.as_bytes());
+ RegisteredProtocolName {
+ name,
version,
- is_multiplex: true,
- };
-
- let mut name2 = self.base_name.clone();
- name2.extend_from_slice(num.as_bytes());
- let proto2 = RegisteredProtocolName {
- name: name2,
- version,
- is_multiplex: false,
- };
-
- // Important note: we prioritize the backwards compatible mode for now.
- // After some intensive testing has been done, we should switch to the new mode by
- // default.
- // Then finally we can remove the old mode after everyone has switched.
- // See https://github.com/paritytech/substrate/issues/1692
- iter::once(proto2).chain(iter::once(proto1))
+ }
}).collect::>().into_iter()
}
}
@@ -366,8 +278,6 @@ pub struct RegisteredProtocolName {
name: Bytes,
/// Version number. Stored in string form in `name`, but duplicated here for easier retrieval.
version: u8,
- /// If true, then this version is the one with the multiplexing.
- is_multiplex: bool,
}
impl ProtocolName for RegisteredProtocolName {
@@ -403,7 +313,6 @@ where TSubstream: AsyncRead + AsyncWrite,
protocol_id: self.id,
protocol_version: info.version,
clogged_fuse: false,
- is_multiplex: info.is_multiplex,
marker: PhantomData,
})
}
@@ -432,7 +341,6 @@ where TSubstream: AsyncRead + AsyncWrite,
protocol_id: self.id,
protocol_version: info.version,
clogged_fuse: false,
- is_multiplex: info.is_multiplex,
marker: PhantomData,
})
}
diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs
index 8c8b471931..639a74933d 100644
--- a/substrate/core/network-libp2p/src/lib.rs
+++ b/substrate/core/network-libp2p/src/lib.rs
@@ -15,7 +15,7 @@
// along with Substrate. If not, see .
//! Networking layer of Substrate.
-//!
+//!
//! **Important**: This crate is unstable and the API and usage may change.
//!
@@ -27,7 +27,7 @@ mod transport;
pub use crate::behaviour::Severity;
pub use crate::config::*;
-pub use crate::custom_proto::{CustomMessage, CustomMessageId, RegisteredProtocol};
+pub use crate::custom_proto::{CustomMessage, RegisteredProtocol};
pub use crate::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode};
pub use crate::service_task::{start_service, Service, ServiceEvent};
pub use libp2p::{Multiaddr, multiaddr, build_multiaddr};
diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network-libp2p/tests/test.rs
index b335b7c46b..ff4d5824e0 100644
--- a/substrate/core/network-libp2p/tests/test.rs
+++ b/substrate/core/network-libp2p/tests/test.rs
@@ -200,7 +200,7 @@ fn many_nodes_connectivity() {
#[test]
fn basic_two_nodes_requests_in_parallel() {
let (mut service1, mut service2) = {
- let mut l = build_nodes::<(Option, Vec)>(2, 50550).into_iter();
+ let mut l = build_nodes::>(2, 50550).into_iter();
let a = l.next().unwrap();
let b = l.next().unwrap();
(a, b)
@@ -209,17 +209,8 @@ fn basic_two_nodes_requests_in_parallel() {
// Generate random messages with or without a request id.
let mut to_send = {
let mut to_send = Vec::new();
- let mut next_id = 0;
for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode.
- let id = if rand::random::() % 4 != 0 {
- let i = next_id;
- next_id += 1;
- Some(i)
- } else {
- None
- };
-
- let msg = (id, (0..10).map(|_| rand::random::()).collect::>());
+ let msg = (0..10).map(|_| rand::random::()).collect::>();
to_send.push(msg);
}
to_send
diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs
index 6053d12f51..b2cc5a888a 100644
--- a/substrate/core/network/src/message.rs
+++ b/substrate/core/network/src/message.rs
@@ -125,7 +125,7 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use parity_codec::{Encode, Decode};
- use network_libp2p::{CustomMessage, CustomMessageId};
+ use network_libp2p::CustomMessage;
use runtime_primitives::Justification;
use crate::config::Roles;
use super::{
@@ -213,26 +213,6 @@ pub mod generic {
fn from_bytes(bytes: &[u8]) -> Result {
Decode::decode(&mut &bytes[..]).ok_or(())
}
-
- fn request_id(&self) -> CustomMessageId {
- match *self {
- Message::Status(_) => CustomMessageId::OneWay,
- Message::BlockRequest(ref req) => CustomMessageId::Request(req.id),
- Message::BlockResponse(ref resp) => CustomMessageId::Response(resp.id),
- Message::BlockAnnounce(_) => CustomMessageId::OneWay,
- Message::Transactions(_) => CustomMessageId::OneWay,
- Message::Consensus(_) => CustomMessageId::OneWay,
- Message::RemoteCallRequest(ref req) => CustomMessageId::Request(req.id),
- Message::RemoteCallResponse(ref resp) => CustomMessageId::Response(resp.id),
- Message::RemoteReadRequest(ref req) => CustomMessageId::Request(req.id),
- Message::RemoteReadResponse(ref resp) => CustomMessageId::Response(resp.id),
- Message::RemoteHeaderRequest(ref req) => CustomMessageId::Request(req.id),
- Message::RemoteHeaderResponse(ref resp) => CustomMessageId::Response(resp.id),
- Message::RemoteChangesRequest(ref req) => CustomMessageId::Request(req.id),
- Message::RemoteChangesResponse(ref resp) => CustomMessageId::Response(resp.id),
- Message::ChainSpecific(_) => CustomMessageId::OneWay,
- }
- }
}
/// Status sent on connection.