From f3df7250e9b777dea88f78b7a9401d189afcad62 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 25 Apr 2019 11:39:57 +0200 Subject: [PATCH] Remove the multiplexed networking system (#2373) * Remove the multiplexed networking system * Rename BackCompat to Normal * Remove CustomMessageId * Fix tests --- .../src/custom_proto/handler.rs | 315 ++---------------- .../network-libp2p/src/custom_proto/mod.rs | 2 +- .../src/custom_proto/upgrade.rs | 108 +----- substrate/core/network-libp2p/src/lib.rs | 4 +- substrate/core/network-libp2p/tests/test.rs | 13 +- substrate/core/network/src/message.rs | 22 +- 6 files changed, 36 insertions(+), 428 deletions(-) diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index beac1dec25..7c0ec61344 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol}; +use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol}; use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream}; use futures::prelude::*; use libp2p::core::{ @@ -25,8 +25,8 @@ use libp2p::core::{ protocols_handler::SubstreamProtocol, upgrade::{InboundUpgrade, OutboundUpgrade} }; -use log::{debug, error, warn}; -use smallvec::{smallvec, SmallVec}; +use log::{debug, error}; +use smallvec::SmallVec; use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; @@ -69,26 +69,17 @@ use void::Void; /// /// ## How it works /// -/// For backwards compatibility reasons, the behaviour of the handler is quite complicated. After -/// enough nodes have upgraded, it should be simplified by using helpers provided by libp2p. -/// /// When the handler is created, it is initially in the `Init` state and waits for either a /// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to /// toggle the handler between the disabled and enabled states. /// /// When the handler is enabled for the first time, if it is the dialer of the connection, it tries -/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx` or a -/// protocol named `/substrate/multi/xxx`. If it is the former, then we are in -/// "backwards-compatibility mode". If it is the latter, we are in normal operation mode. +/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx`, where +/// `xxx` is chosen by the user. /// -/// In "backwards-compatibility mode", we have one unique substream where bidirectional -/// communications happen. If the remote closes the substream, we consider that we are now -/// disconnected. Re-enabling is performed by re-opening the substream. -/// -/// In normal operation mode, each request gets sent over a different substream where the response -/// is then sent back. If the remote refuses one of our substream open request, or if an error -/// happens on one substream, we consider that we are disconnected. Re-enabling is performed by -/// opening an outbound substream. +/// Then, we have one unique substream where bidirectional communications happen. If the remote +/// closes the substream, we consider that we are now disconnected. Re-enabling is performed by +/// re-opening the substream (even if we are not the dialer of the connection). /// pub struct CustomProtoHandlerProto { /// Configuration for the protocol upgrade to negotiate. @@ -159,7 +150,6 @@ pub struct CustomProtoHandler { /// `Clock` instance that uses the current execution context's source of time. clock: Clock, - } /// State of the handler. @@ -181,17 +171,13 @@ enum ProtocolState { /// Backwards-compatible mode. Contains the unique substream that is open. /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. - BackCompat { + Normal { /// The unique substream where bidirectional communications happen. substream: RegisteredProtocolSubstream, /// Contains substreams which are being shut down. shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, }, - /// Normal functionning. Contains the substreams that are open. - /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. - Normal(PerProtocolNormalState), - /// We are disabled. Contains substreams that are being closed. /// If we are in this state, either we have sent a `CustomProtocolClosed` message to the /// outside or we have never sent any `CustomProtocolOpen` in the first place. @@ -212,128 +198,6 @@ enum ProtocolState { Poisoned, } -/// Normal functionning mode for a protocol. -struct PerProtocolNormalState { - /// Optional substream that we opened. - outgoing_substream: Option>, - - /// Substreams that have been opened by the remote. We are waiting for a packet from it. - incoming_substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, - - /// For each request that has been sent to the remote, contains the substream where we - /// expect a response. - pending_response: SmallVec<[(u64, RegisteredProtocolSubstream); 4]>, - - /// For each request received by the remote, contains the substream where to send back our - /// response. Once a response has been sent, the substream closes. - pending_send_back: SmallVec<[(u64, RegisteredProtocolSubstream); 4]>, - - /// List of messages waiting for a substream to open in order to be sent. - pending_messages: SmallVec<[TMessage; 6]>, - - /// Contains substreams which are being shut down. - shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, -} - -impl PerProtocolNormalState -where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { - /// Polls for things that are new. Same API constraints as `Future::poll()`. - /// Optionally returns the event to produce. - /// You must pass the `protocol_id` as we need have to inject it in the returned event. - /// API note: Ideally we wouldn't need to be passed a `ProtocolId`, and we would return a - /// different enum that doesn't contain any `protocol_id`, and the caller would inject - /// the ID itself, but that's a ton of code for not much gain. - fn poll(&mut self) -> Option> { - for n in (0..self.pending_response.len()).rev() { - let (request_id, mut substream) = self.pending_response.swap_remove(n); - match substream.poll() { - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { - if message.request_id() == CustomMessageId::Response(request_id) { - let event = CustomProtoHandlerOut::CustomMessage { - message - }; - self.shutdown.push(substream); - return Some(event); - } else { - self.shutdown.push(substream); - let event = CustomProtoHandlerOut::ProtocolError { - is_severe: true, - error: format!("Request ID doesn't match substream: expected {:?}, \ - got {:?}", request_id, message.request_id()).into(), - }; - return Some(event); - } - }, - Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => - unreachable!("Cannot receive Clogged message with new protocol version; QED"), - Ok(Async::NotReady) => - self.pending_response.push((request_id, substream)), - Ok(Async::Ready(None)) => { - self.shutdown.push(substream); - let event = CustomProtoHandlerOut::ProtocolError { - is_severe: false, - error: format!("Request ID {:?} didn't receive an answer", request_id).into(), - }; - return Some(event); - } - Err(err) => { - self.shutdown.push(substream); - let event = CustomProtoHandlerOut::ProtocolError { - is_severe: false, - error: format!("Error while waiting for an answer for {:?}: {}", - request_id, err).into(), - }; - return Some(event); - } - } - } - - for n in (0..self.incoming_substreams.len()).rev() { - let mut substream = self.incoming_substreams.swap_remove(n); - match substream.poll() { - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { - return match message.request_id() { - CustomMessageId::Request(id) => { - self.pending_send_back.push((id, substream)); - Some(CustomProtoHandlerOut::CustomMessage { - message - }) - } - CustomMessageId::OneWay => { - self.shutdown.push(substream); - Some(CustomProtoHandlerOut::CustomMessage { - message - }) - } - _ => { - self.shutdown.push(substream); - Some(CustomProtoHandlerOut::ProtocolError { - is_severe: true, - error: format!("Received response in new substream").into(), - }) - } - } - }, - Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => - unreachable!("Cannot receive Clogged message with new protocol version; QED"), - Ok(Async::NotReady) => - self.incoming_substreams.push(substream), - Ok(Async::Ready(None)) => {} - Err(err) => { - self.shutdown.push(substream); - return Some(CustomProtoHandlerOut::ProtocolError { - is_severe: false, - error: format!("Error in incoming substream: {}", err).into(), - }); - } - } - } - - shutdown_list(&mut self.shutdown); - None - } -} - /// Event that can be received by a `CustomProtoHandler`. #[derive(Debug)] pub enum CustomProtoHandlerIn { @@ -414,26 +278,12 @@ where deadline: Delay::new(self.clock.now() + Duration::from_secs(60)) } - } else if incoming.iter().any(|s| s.is_multiplex()) { - let event = CustomProtoHandlerOut::CustomProtocolOpen { - version: incoming[0].protocol_version() - }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::Normal(PerProtocolNormalState { - outgoing_substream: None, - incoming_substreams: incoming.into_iter().collect(), - pending_response: SmallVec::new(), - pending_send_back: SmallVec::new(), - pending_messages: SmallVec::new(), - shutdown: SmallVec::new(), - }) - } else { let event = CustomProtoHandlerOut::CustomProtocolOpen { version: incoming[0].protocol_version() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::BackCompat { + ProtocolState::Normal { substream: incoming.into_iter().next() .expect("We have a check above that incoming isn't empty; QED"), shutdown: SmallVec::new() @@ -442,7 +292,6 @@ where } st @ ProtocolState::Opening { .. } => st, - st @ ProtocolState::BackCompat { .. } => st, st @ ProtocolState::Normal { .. } => st, ProtocolState::Disabled { shutdown, .. } => { ProtocolState::Disabled { shutdown, reenable: true } @@ -470,7 +319,7 @@ where ProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false } } - ProtocolState::BackCompat { mut substream, mut shutdown } => { + ProtocolState::Normal { mut substream, mut shutdown } => { substream.shutdown(); shutdown.push(substream); let event = CustomProtoHandlerOut::CustomProtocolClosed { @@ -483,23 +332,6 @@ where } } - ProtocolState::Normal(state) => { - let mut out: SmallVec<[_; 6]> = SmallVec::new(); - out.extend(state.outgoing_substream.into_iter()); - out.extend(state.incoming_substreams.into_iter()); - out.extend(state.pending_response.into_iter().map(|(_, s)| s)); - out.extend(state.pending_send_back.into_iter().map(|(_, s)| s)); - for s in &mut out { - s.shutdown(); - } - out.extend(state.shutdown.into_iter()); - let event = CustomProtoHandlerOut::CustomProtocolClosed { - result: Ok(()) - }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::Disabled { shutdown: out, reenable: false } - } - ProtocolState::Disabled { shutdown, .. } => ProtocolState::Disabled { shutdown, reenable: false }, }; @@ -557,25 +389,25 @@ where } } - ProtocolState::BackCompat { mut substream, shutdown } => { + ProtocolState::Normal { mut substream, shutdown } => { match substream.poll() { Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { let event = CustomProtoHandlerOut::CustomMessage { message }; return_value = Some(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::BackCompat { substream, shutdown } + ProtocolState::Normal { substream, shutdown } }, Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => { let event = CustomProtoHandlerOut::Clogged { messages, }; return_value = Some(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::BackCompat { substream, shutdown } + ProtocolState::Normal { substream, shutdown } } Ok(Async::NotReady) => { return_value = None; - ProtocolState::BackCompat { substream, shutdown } + ProtocolState::Normal { substream, shutdown } } Ok(Async::Ready(None)) => { let event = CustomProtoHandlerOut::CustomProtocolClosed { @@ -600,16 +432,6 @@ where } } - ProtocolState::Normal(mut norm_state) => { - if let Some(event) = norm_state.poll() { - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - } else { - return_value = None; - } - - ProtocolState::Normal(norm_state) - } - ProtocolState::Disabled { mut shutdown, reenable } => { shutdown_list(&mut shutdown); // If `reenable` is `true`, that means we should open the substreams system again @@ -658,65 +480,18 @@ where version: substream.protocol_version() }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - - match (substream.endpoint(), substream.is_multiplex()) { - (Endpoint::Dialer, true) => { - ProtocolState::Normal(PerProtocolNormalState { - outgoing_substream: Some(substream), - incoming_substreams: SmallVec::new(), - pending_response: SmallVec::new(), - pending_send_back: SmallVec::new(), - pending_messages: SmallVec::new(), - shutdown: SmallVec::new(), - }) - }, - (Endpoint::Listener, true) => { - ProtocolState::Normal(PerProtocolNormalState { - outgoing_substream: None, - incoming_substreams: smallvec![substream], - pending_response: SmallVec::new(), - pending_send_back: SmallVec::new(), - pending_messages: SmallVec::new(), - shutdown: SmallVec::new(), - }) - }, - (_, false) => { - ProtocolState::BackCompat { - substream, - shutdown: SmallVec::new() - } - }, + ProtocolState::Normal { + substream, + shutdown: SmallVec::new() } } - ProtocolState::BackCompat { substream: existing, mut shutdown } => { + ProtocolState::Normal { substream: existing, mut shutdown } => { debug!(target: "sub-libp2p", "Received extra substream after having already one \ open in backwards-compatibility mode with {:?}", self.remote_peer_id); substream.shutdown(); shutdown.push(substream); - ProtocolState::BackCompat { substream: existing, shutdown } - } - - ProtocolState::Normal(mut state) => { - if substream.endpoint() == Endpoint::Listener { - state.incoming_substreams.push(substream); - } else if !state.pending_messages.is_empty() { - let message = state.pending_messages.remove(0); - let request_id = message.request_id(); - substream.send_message(message); - if let CustomMessageId::Request(request_id) = request_id { - state.pending_response.push((request_id, substream)); - } else { - state.shutdown.push(substream); - } - } else { - debug!(target: "sub-libp2p", "Opened spurious outbound substream with {:?}", - self.remote_peer_id); - substream.shutdown(); - state.shutdown.push(substream); - } - - ProtocolState::Normal(state) + ProtocolState::Normal { substream: existing, shutdown } } ProtocolState::Disabled { mut shutdown, .. } => { @@ -730,54 +505,9 @@ where /// Sends a message to the remote. fn send_message(&mut self, message: TMessage) { match self.state { - ProtocolState::BackCompat { ref mut substream, .. } => + ProtocolState::Normal { ref mut substream, .. } => substream.send_message(message), - ProtocolState::Normal(ref mut state) => { - if let CustomMessageId::Request(request_id) = message.request_id() { - if let Some(mut outgoing_substream) = state.outgoing_substream.take() { - outgoing_substream.send_message(message); - state.pending_response.push((request_id, outgoing_substream)); - } else { - if state.pending_messages.len() >= 2048 { - let event = CustomProtoHandlerOut::Clogged { - messages: Vec::new(), - }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - } - state.pending_messages.push(message); - self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol.clone()), - info: () - }); - } - } else if let CustomMessageId::Response(request_id) = message.request_id() { - if let Some(pos) = state.pending_send_back.iter().position(|(id, _)| *id == request_id) { - let (_, mut substream) = state.pending_send_back.remove(pos); - substream.send_message(message); - state.shutdown.push(substream); - } else { - warn!(target: "sub-libp2p", "Libp2p layer received response to a \ - non-existing request ID {:?} with {:?}", request_id, self.remote_peer_id); - } - } else if let Some(mut outgoing_substream) = state.outgoing_substream.take() { - outgoing_substream.send_message(message); - state.shutdown.push(outgoing_substream); - } else { - if state.pending_messages.len() >= 2048 { - let event = CustomProtoHandlerOut::Clogged { - messages: Vec::new(), - }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - } - state.pending_messages.push(message); - self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.protocol.clone()), - info: () - }); - } - } - _ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \ with {:?}", self.remote_peer_id) } @@ -844,8 +574,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { match self.state { ProtocolState::Init { .. } | ProtocolState::Opening { .. } => {} - ProtocolState::BackCompat { .. } | ProtocolState::Normal { .. } => - keep_forever = true, + ProtocolState::Normal { .. } => keep_forever = true, ProtocolState::Disabled { .. } | ProtocolState::Poisoned => return KeepAlive::No, } diff --git a/substrate/core/network-libp2p/src/custom_proto/mod.rs b/substrate/core/network-libp2p/src/custom_proto/mod.rs index cf2bf57153..261f710d8d 100644 --- a/substrate/core/network-libp2p/src/custom_proto/mod.rs +++ b/substrate/core/network-libp2p/src/custom_proto/mod.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . pub use self::behaviour::{CustomProto, CustomProtoOut}; -pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol}; +pub use self::upgrade::{CustomMessage, RegisteredProtocol}; mod behaviour; mod handler; diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs index 00c3fc999d..bc61ff74e8 100644 --- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs +++ b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs @@ -19,7 +19,7 @@ use bytes::Bytes; use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; use libp2p::tokio_codec::Framed; use log::warn; -use std::{collections::VecDeque, io, iter, marker::PhantomData, vec::IntoIter as VecIntoIter}; +use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter}; use futures::{prelude::*, future, stream}; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec::UviBytes; @@ -100,9 +100,6 @@ pub struct RegisteredProtocolSubstream { /// If true, we have sent a "remote is clogged" event recently and shouldn't send another one /// unless the buffer empties then fills itself again. clogged_fuse: bool, - /// If true, then this substream uses the "/multi/" version of the protocol. This is a hint - /// that the handler can behave differently. - is_multiplex: bool, /// Marker to pin the generic. marker: PhantomData, } @@ -126,12 +123,6 @@ impl RegisteredProtocolSubstream { self.endpoint } - /// Returns true if we negotiated the "multiplexed" version. This means that the handler can - /// open multiple substreams instead of just one. - pub fn is_multiplex(&self) -> bool { - self.is_multiplex - } - /// Starts a graceful shutdown process on this substream. /// /// Note that "graceful" means that we sent a closing message. We don't wait for any @@ -162,31 +153,9 @@ pub trait CustomMessage { /// Tries to parse `bytes` received from the network into a message. fn from_bytes(bytes: &[u8]) -> Result where Self: Sized; - - /// Returns a unique ID that is used to match request and responses. - /// - /// The networking layer employs multiplexing in order to have multiple parallel data streams. - /// Transmitting messages over the network uses two kinds of substreams: - /// - /// - Undirectional substreams, where we send a single message then close the substream. - /// - Bidirectional substreams, where we send a message then wait for a response. Once the - /// response has arrived, we close the substream. - /// - /// If `request_id()` returns `OneWay`, then this message will be sent or received over a - /// unidirectional substream. If instead it returns `Request` or `Response`, then we use the - /// value to match a request with its response. - fn request_id(&self) -> CustomMessageId; } -/// See the documentation of `CustomMessage::request_id`. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CustomMessageId { - OneWay, - Request(u64), - Response(u64), -} - -// These trait implementations exist mostly for testing convenience. This should eventually be +// This trait implementation exist mostly for testing convenience. This should eventually be // removed. impl CustomMessage for Vec { @@ -197,45 +166,6 @@ impl CustomMessage for Vec { fn from_bytes(bytes: &[u8]) -> Result { Ok(bytes.to_vec()) } - - fn request_id(&self) -> CustomMessageId { - CustomMessageId::OneWay - } -} - -impl CustomMessage for (Option, Vec) { - fn into_bytes(self) -> Vec { - use byteorder::WriteBytesExt; - use std::io::Write; - let mut out = Vec::new(); - out.write_u64::(self.0.unwrap_or(u64::max_value())) - .expect("Writing to a Vec can never fail"); - out.write_all(&self.1).expect("Writing to a Vec can never fail"); - out - } - - fn from_bytes(bytes: &[u8]) -> Result { - use byteorder::ReadBytesExt; - use std::io::Read; - let mut rdr = std::io::Cursor::new(bytes); - let id = rdr.read_u64::().map_err(|_| ())?; - let mut out = Vec::new(); - rdr.read_to_end(&mut out).map_err(|_| ())?; - let id = if id == u64::max_value() { - None - } else { - Some(id) - }; - Ok((id, out)) - } - - fn request_id(&self) -> CustomMessageId { - if let Some(id) = self.0 { - CustomMessageId::Request(id) - } else { - CustomMessageId::OneWay - } - } } /// Event produced by the `RegisteredProtocolSubstream`. @@ -328,33 +258,15 @@ impl UpgradeInfo for RegisteredProtocol { #[inline] fn protocol_info(&self) -> Self::InfoIter { // Report each version as an individual protocol. - self.supported_versions.iter().flat_map(|&version| { + self.supported_versions.iter().map(|&version| { let num = version.to_string(); - // Note that `name1` is the multiplex version, as we priviledge it over the old one. - let mut name1 = self.base_name.clone(); - name1.extend_from_slice(b"multi/"); - name1.extend_from_slice(num.as_bytes()); - let proto1 = RegisteredProtocolName { - name: name1, + let mut name = self.base_name.clone(); + name.extend_from_slice(num.as_bytes()); + RegisteredProtocolName { + name, version, - is_multiplex: true, - }; - - let mut name2 = self.base_name.clone(); - name2.extend_from_slice(num.as_bytes()); - let proto2 = RegisteredProtocolName { - name: name2, - version, - is_multiplex: false, - }; - - // Important note: we prioritize the backwards compatible mode for now. - // After some intensive testing has been done, we should switch to the new mode by - // default. - // Then finally we can remove the old mode after everyone has switched. - // See https://github.com/paritytech/substrate/issues/1692 - iter::once(proto2).chain(iter::once(proto1)) + } }).collect::>().into_iter() } } @@ -366,8 +278,6 @@ pub struct RegisteredProtocolName { name: Bytes, /// Version number. Stored in string form in `name`, but duplicated here for easier retrieval. version: u8, - /// If true, then this version is the one with the multiplexing. - is_multiplex: bool, } impl ProtocolName for RegisteredProtocolName { @@ -403,7 +313,6 @@ where TSubstream: AsyncRead + AsyncWrite, protocol_id: self.id, protocol_version: info.version, clogged_fuse: false, - is_multiplex: info.is_multiplex, marker: PhantomData, }) } @@ -432,7 +341,6 @@ where TSubstream: AsyncRead + AsyncWrite, protocol_id: self.id, protocol_version: info.version, clogged_fuse: false, - is_multiplex: info.is_multiplex, marker: PhantomData, }) } diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 8c8b471931..639a74933d 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . //! Networking layer of Substrate. -//! +//! //! **Important**: This crate is unstable and the API and usage may change. //! @@ -27,7 +27,7 @@ mod transport; pub use crate::behaviour::Severity; pub use crate::config::*; -pub use crate::custom_proto::{CustomMessage, CustomMessageId, RegisteredProtocol}; +pub use crate::custom_proto::{CustomMessage, RegisteredProtocol}; pub use crate::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode}; pub use crate::service_task::{start_service, Service, ServiceEvent}; pub use libp2p::{Multiaddr, multiaddr, build_multiaddr}; diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network-libp2p/tests/test.rs index b335b7c46b..ff4d5824e0 100644 --- a/substrate/core/network-libp2p/tests/test.rs +++ b/substrate/core/network-libp2p/tests/test.rs @@ -200,7 +200,7 @@ fn many_nodes_connectivity() { #[test] fn basic_two_nodes_requests_in_parallel() { let (mut service1, mut service2) = { - let mut l = build_nodes::<(Option, Vec)>(2, 50550).into_iter(); + let mut l = build_nodes::>(2, 50550).into_iter(); let a = l.next().unwrap(); let b = l.next().unwrap(); (a, b) @@ -209,17 +209,8 @@ fn basic_two_nodes_requests_in_parallel() { // Generate random messages with or without a request id. let mut to_send = { let mut to_send = Vec::new(); - let mut next_id = 0; for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. - let id = if rand::random::() % 4 != 0 { - let i = next_id; - next_id += 1; - Some(i) - } else { - None - }; - - let msg = (id, (0..10).map(|_| rand::random::()).collect::>()); + let msg = (0..10).map(|_| rand::random::()).collect::>(); to_send.push(msg); } to_send diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs index 6053d12f51..b2cc5a888a 100644 --- a/substrate/core/network/src/message.rs +++ b/substrate/core/network/src/message.rs @@ -125,7 +125,7 @@ pub struct RemoteReadResponse { /// Generic types. pub mod generic { use parity_codec::{Encode, Decode}; - use network_libp2p::{CustomMessage, CustomMessageId}; + use network_libp2p::CustomMessage; use runtime_primitives::Justification; use crate::config::Roles; use super::{ @@ -213,26 +213,6 @@ pub mod generic { fn from_bytes(bytes: &[u8]) -> Result { Decode::decode(&mut &bytes[..]).ok_or(()) } - - fn request_id(&self) -> CustomMessageId { - match *self { - Message::Status(_) => CustomMessageId::OneWay, - Message::BlockRequest(ref req) => CustomMessageId::Request(req.id), - Message::BlockResponse(ref resp) => CustomMessageId::Response(resp.id), - Message::BlockAnnounce(_) => CustomMessageId::OneWay, - Message::Transactions(_) => CustomMessageId::OneWay, - Message::Consensus(_) => CustomMessageId::OneWay, - Message::RemoteCallRequest(ref req) => CustomMessageId::Request(req.id), - Message::RemoteCallResponse(ref resp) => CustomMessageId::Response(resp.id), - Message::RemoteReadRequest(ref req) => CustomMessageId::Request(req.id), - Message::RemoteReadResponse(ref resp) => CustomMessageId::Response(resp.id), - Message::RemoteHeaderRequest(ref req) => CustomMessageId::Request(req.id), - Message::RemoteHeaderResponse(ref resp) => CustomMessageId::Response(resp.id), - Message::RemoteChangesRequest(ref req) => CustomMessageId::Request(req.id), - Message::RemoteChangesResponse(ref resp) => CustomMessageId::Response(resp.id), - Message::ChainSpecific(_) => CustomMessageId::OneWay, - } - } } /// Status sent on connection.