mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 17:01:09 +00:00
Remove the multiplexed networking system (#2373)
* Remove the multiplexed networking system * Rename BackCompat to Normal * Remove CustomMessageId * Fix tests
This commit is contained in:
committed by
Bastian Köcher
parent
0f02bed702
commit
f3df7250e9
@@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol};
|
||||
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
|
||||
use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream};
|
||||
use futures::prelude::*;
|
||||
use libp2p::core::{
|
||||
@@ -25,8 +25,8 @@ use libp2p::core::{
|
||||
protocols_handler::SubstreamProtocol,
|
||||
upgrade::{InboundUpgrade, OutboundUpgrade}
|
||||
};
|
||||
use log::{debug, error, warn};
|
||||
use smallvec::{smallvec, SmallVec};
|
||||
use log::{debug, error};
|
||||
use smallvec::SmallVec;
|
||||
use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_timer::{Delay, clock::Clock};
|
||||
@@ -69,26 +69,17 @@ use void::Void;
|
||||
///
|
||||
/// ## How it works
|
||||
///
|
||||
/// For backwards compatibility reasons, the behaviour of the handler is quite complicated. After
|
||||
/// enough nodes have upgraded, it should be simplified by using helpers provided by libp2p.
|
||||
///
|
||||
/// When the handler is created, it is initially in the `Init` state and waits for either a
|
||||
/// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to
|
||||
/// toggle the handler between the disabled and enabled states.
|
||||
///
|
||||
/// When the handler is enabled for the first time, if it is the dialer of the connection, it tries
|
||||
/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx` or a
|
||||
/// protocol named `/substrate/multi/xxx`. If it is the former, then we are in
|
||||
/// "backwards-compatibility mode". If it is the latter, we are in normal operation mode.
|
||||
/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx`, where
|
||||
/// `xxx` is chosen by the user.
|
||||
///
|
||||
/// In "backwards-compatibility mode", we have one unique substream where bidirectional
|
||||
/// communications happen. If the remote closes the substream, we consider that we are now
|
||||
/// disconnected. Re-enabling is performed by re-opening the substream.
|
||||
///
|
||||
/// In normal operation mode, each request gets sent over a different substream where the response
|
||||
/// is then sent back. If the remote refuses one of our substream open request, or if an error
|
||||
/// happens on one substream, we consider that we are disconnected. Re-enabling is performed by
|
||||
/// opening an outbound substream.
|
||||
/// Then, we have one unique substream where bidirectional communications happen. If the remote
|
||||
/// closes the substream, we consider that we are now disconnected. Re-enabling is performed by
|
||||
/// re-opening the substream (even if we are not the dialer of the connection).
|
||||
///
|
||||
pub struct CustomProtoHandlerProto<TMessage, TSubstream> {
|
||||
/// Configuration for the protocol upgrade to negotiate.
|
||||
@@ -159,7 +150,6 @@ pub struct CustomProtoHandler<TMessage, TSubstream> {
|
||||
|
||||
/// `Clock` instance that uses the current execution context's source of time.
|
||||
clock: Clock,
|
||||
|
||||
}
|
||||
|
||||
/// State of the handler.
|
||||
@@ -181,17 +171,13 @@ enum ProtocolState<TMessage, TSubstream> {
|
||||
|
||||
/// Backwards-compatible mode. Contains the unique substream that is open.
|
||||
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
|
||||
BackCompat {
|
||||
Normal {
|
||||
/// The unique substream where bidirectional communications happen.
|
||||
substream: RegisteredProtocolSubstream<TMessage, TSubstream>,
|
||||
/// Contains substreams which are being shut down.
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
|
||||
},
|
||||
|
||||
/// Normal functionning. Contains the substreams that are open.
|
||||
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
|
||||
Normal(PerProtocolNormalState<TMessage, TSubstream>),
|
||||
|
||||
/// We are disabled. Contains substreams that are being closed.
|
||||
/// If we are in this state, either we have sent a `CustomProtocolClosed` message to the
|
||||
/// outside or we have never sent any `CustomProtocolOpen` in the first place.
|
||||
@@ -212,128 +198,6 @@ enum ProtocolState<TMessage, TSubstream> {
|
||||
Poisoned,
|
||||
}
|
||||
|
||||
/// Normal functionning mode for a protocol.
|
||||
struct PerProtocolNormalState<TMessage, TSubstream> {
|
||||
/// Optional substream that we opened.
|
||||
outgoing_substream: Option<RegisteredProtocolSubstream<TMessage, TSubstream>>,
|
||||
|
||||
/// Substreams that have been opened by the remote. We are waiting for a packet from it.
|
||||
incoming_substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
|
||||
|
||||
/// For each request that has been sent to the remote, contains the substream where we
|
||||
/// expect a response.
|
||||
pending_response: SmallVec<[(u64, RegisteredProtocolSubstream<TMessage, TSubstream>); 4]>,
|
||||
|
||||
/// For each request received by the remote, contains the substream where to send back our
|
||||
/// response. Once a response has been sent, the substream closes.
|
||||
pending_send_back: SmallVec<[(u64, RegisteredProtocolSubstream<TMessage, TSubstream>); 4]>,
|
||||
|
||||
/// List of messages waiting for a substream to open in order to be sent.
|
||||
pending_messages: SmallVec<[TMessage; 6]>,
|
||||
|
||||
/// Contains substreams which are being shut down.
|
||||
shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
|
||||
}
|
||||
|
||||
impl<TMessage, TSubstream> PerProtocolNormalState<TMessage, TSubstream>
|
||||
where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite {
|
||||
/// Polls for things that are new. Same API constraints as `Future::poll()`.
|
||||
/// Optionally returns the event to produce.
|
||||
/// You must pass the `protocol_id` as we need have to inject it in the returned event.
|
||||
/// API note: Ideally we wouldn't need to be passed a `ProtocolId`, and we would return a
|
||||
/// different enum that doesn't contain any `protocol_id`, and the caller would inject
|
||||
/// the ID itself, but that's a ton of code for not much gain.
|
||||
fn poll(&mut self) -> Option<CustomProtoHandlerOut<TMessage>> {
|
||||
for n in (0..self.pending_response.len()).rev() {
|
||||
let (request_id, mut substream) = self.pending_response.swap_remove(n);
|
||||
match substream.poll() {
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
|
||||
if message.request_id() == CustomMessageId::Response(request_id) {
|
||||
let event = CustomProtoHandlerOut::CustomMessage {
|
||||
message
|
||||
};
|
||||
self.shutdown.push(substream);
|
||||
return Some(event);
|
||||
} else {
|
||||
self.shutdown.push(substream);
|
||||
let event = CustomProtoHandlerOut::ProtocolError {
|
||||
is_severe: true,
|
||||
error: format!("Request ID doesn't match substream: expected {:?}, \
|
||||
got {:?}", request_id, message.request_id()).into(),
|
||||
};
|
||||
return Some(event);
|
||||
}
|
||||
},
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) =>
|
||||
unreachable!("Cannot receive Clogged message with new protocol version; QED"),
|
||||
Ok(Async::NotReady) =>
|
||||
self.pending_response.push((request_id, substream)),
|
||||
Ok(Async::Ready(None)) => {
|
||||
self.shutdown.push(substream);
|
||||
let event = CustomProtoHandlerOut::ProtocolError {
|
||||
is_severe: false,
|
||||
error: format!("Request ID {:?} didn't receive an answer", request_id).into(),
|
||||
};
|
||||
return Some(event);
|
||||
}
|
||||
Err(err) => {
|
||||
self.shutdown.push(substream);
|
||||
let event = CustomProtoHandlerOut::ProtocolError {
|
||||
is_severe: false,
|
||||
error: format!("Error while waiting for an answer for {:?}: {}",
|
||||
request_id, err).into(),
|
||||
};
|
||||
return Some(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for n in (0..self.incoming_substreams.len()).rev() {
|
||||
let mut substream = self.incoming_substreams.swap_remove(n);
|
||||
match substream.poll() {
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
|
||||
return match message.request_id() {
|
||||
CustomMessageId::Request(id) => {
|
||||
self.pending_send_back.push((id, substream));
|
||||
Some(CustomProtoHandlerOut::CustomMessage {
|
||||
message
|
||||
})
|
||||
}
|
||||
CustomMessageId::OneWay => {
|
||||
self.shutdown.push(substream);
|
||||
Some(CustomProtoHandlerOut::CustomMessage {
|
||||
message
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
self.shutdown.push(substream);
|
||||
Some(CustomProtoHandlerOut::ProtocolError {
|
||||
is_severe: true,
|
||||
error: format!("Received response in new substream").into(),
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) =>
|
||||
unreachable!("Cannot receive Clogged message with new protocol version; QED"),
|
||||
Ok(Async::NotReady) =>
|
||||
self.incoming_substreams.push(substream),
|
||||
Ok(Async::Ready(None)) => {}
|
||||
Err(err) => {
|
||||
self.shutdown.push(substream);
|
||||
return Some(CustomProtoHandlerOut::ProtocolError {
|
||||
is_severe: false,
|
||||
error: format!("Error in incoming substream: {}", err).into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
shutdown_list(&mut self.shutdown);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Event that can be received by a `CustomProtoHandler`.
|
||||
#[derive(Debug)]
|
||||
pub enum CustomProtoHandlerIn<TMessage> {
|
||||
@@ -414,26 +278,12 @@ where
|
||||
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
|
||||
}
|
||||
|
||||
} else if incoming.iter().any(|s| s.is_multiplex()) {
|
||||
let event = CustomProtoHandlerOut::CustomProtocolOpen {
|
||||
version: incoming[0].protocol_version()
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::Normal(PerProtocolNormalState {
|
||||
outgoing_substream: None,
|
||||
incoming_substreams: incoming.into_iter().collect(),
|
||||
pending_response: SmallVec::new(),
|
||||
pending_send_back: SmallVec::new(),
|
||||
pending_messages: SmallVec::new(),
|
||||
shutdown: SmallVec::new(),
|
||||
})
|
||||
|
||||
} else {
|
||||
let event = CustomProtoHandlerOut::CustomProtocolOpen {
|
||||
version: incoming[0].protocol_version()
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::BackCompat {
|
||||
ProtocolState::Normal {
|
||||
substream: incoming.into_iter().next()
|
||||
.expect("We have a check above that incoming isn't empty; QED"),
|
||||
shutdown: SmallVec::new()
|
||||
@@ -442,7 +292,6 @@ where
|
||||
}
|
||||
|
||||
st @ ProtocolState::Opening { .. } => st,
|
||||
st @ ProtocolState::BackCompat { .. } => st,
|
||||
st @ ProtocolState::Normal { .. } => st,
|
||||
ProtocolState::Disabled { shutdown, .. } => {
|
||||
ProtocolState::Disabled { shutdown, reenable: true }
|
||||
@@ -470,7 +319,7 @@ where
|
||||
ProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false }
|
||||
}
|
||||
|
||||
ProtocolState::BackCompat { mut substream, mut shutdown } => {
|
||||
ProtocolState::Normal { mut substream, mut shutdown } => {
|
||||
substream.shutdown();
|
||||
shutdown.push(substream);
|
||||
let event = CustomProtoHandlerOut::CustomProtocolClosed {
|
||||
@@ -483,23 +332,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
ProtocolState::Normal(state) => {
|
||||
let mut out: SmallVec<[_; 6]> = SmallVec::new();
|
||||
out.extend(state.outgoing_substream.into_iter());
|
||||
out.extend(state.incoming_substreams.into_iter());
|
||||
out.extend(state.pending_response.into_iter().map(|(_, s)| s));
|
||||
out.extend(state.pending_send_back.into_iter().map(|(_, s)| s));
|
||||
for s in &mut out {
|
||||
s.shutdown();
|
||||
}
|
||||
out.extend(state.shutdown.into_iter());
|
||||
let event = CustomProtoHandlerOut::CustomProtocolClosed {
|
||||
result: Ok(())
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::Disabled { shutdown: out, reenable: false }
|
||||
}
|
||||
|
||||
ProtocolState::Disabled { shutdown, .. } =>
|
||||
ProtocolState::Disabled { shutdown, reenable: false },
|
||||
};
|
||||
@@ -557,25 +389,25 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
ProtocolState::BackCompat { mut substream, shutdown } => {
|
||||
ProtocolState::Normal { mut substream, shutdown } => {
|
||||
match substream.poll() {
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
|
||||
let event = CustomProtoHandlerOut::CustomMessage {
|
||||
message
|
||||
};
|
||||
return_value = Some(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::BackCompat { substream, shutdown }
|
||||
ProtocolState::Normal { substream, shutdown }
|
||||
},
|
||||
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => {
|
||||
let event = CustomProtoHandlerOut::Clogged {
|
||||
messages,
|
||||
};
|
||||
return_value = Some(ProtocolsHandlerEvent::Custom(event));
|
||||
ProtocolState::BackCompat { substream, shutdown }
|
||||
ProtocolState::Normal { substream, shutdown }
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
return_value = None;
|
||||
ProtocolState::BackCompat { substream, shutdown }
|
||||
ProtocolState::Normal { substream, shutdown }
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
let event = CustomProtoHandlerOut::CustomProtocolClosed {
|
||||
@@ -600,16 +432,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
ProtocolState::Normal(mut norm_state) => {
|
||||
if let Some(event) = norm_state.poll() {
|
||||
return_value = Some(ProtocolsHandlerEvent::Custom(event));
|
||||
} else {
|
||||
return_value = None;
|
||||
}
|
||||
|
||||
ProtocolState::Normal(norm_state)
|
||||
}
|
||||
|
||||
ProtocolState::Disabled { mut shutdown, reenable } => {
|
||||
shutdown_list(&mut shutdown);
|
||||
// If `reenable` is `true`, that means we should open the substreams system again
|
||||
@@ -658,65 +480,18 @@ where
|
||||
version: substream.protocol_version()
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
|
||||
match (substream.endpoint(), substream.is_multiplex()) {
|
||||
(Endpoint::Dialer, true) => {
|
||||
ProtocolState::Normal(PerProtocolNormalState {
|
||||
outgoing_substream: Some(substream),
|
||||
incoming_substreams: SmallVec::new(),
|
||||
pending_response: SmallVec::new(),
|
||||
pending_send_back: SmallVec::new(),
|
||||
pending_messages: SmallVec::new(),
|
||||
shutdown: SmallVec::new(),
|
||||
})
|
||||
},
|
||||
(Endpoint::Listener, true) => {
|
||||
ProtocolState::Normal(PerProtocolNormalState {
|
||||
outgoing_substream: None,
|
||||
incoming_substreams: smallvec![substream],
|
||||
pending_response: SmallVec::new(),
|
||||
pending_send_back: SmallVec::new(),
|
||||
pending_messages: SmallVec::new(),
|
||||
shutdown: SmallVec::new(),
|
||||
})
|
||||
},
|
||||
(_, false) => {
|
||||
ProtocolState::BackCompat {
|
||||
substream,
|
||||
shutdown: SmallVec::new()
|
||||
}
|
||||
},
|
||||
ProtocolState::Normal {
|
||||
substream,
|
||||
shutdown: SmallVec::new()
|
||||
}
|
||||
}
|
||||
|
||||
ProtocolState::BackCompat { substream: existing, mut shutdown } => {
|
||||
ProtocolState::Normal { substream: existing, mut shutdown } => {
|
||||
debug!(target: "sub-libp2p", "Received extra substream after having already one \
|
||||
open in backwards-compatibility mode with {:?}", self.remote_peer_id);
|
||||
substream.shutdown();
|
||||
shutdown.push(substream);
|
||||
ProtocolState::BackCompat { substream: existing, shutdown }
|
||||
}
|
||||
|
||||
ProtocolState::Normal(mut state) => {
|
||||
if substream.endpoint() == Endpoint::Listener {
|
||||
state.incoming_substreams.push(substream);
|
||||
} else if !state.pending_messages.is_empty() {
|
||||
let message = state.pending_messages.remove(0);
|
||||
let request_id = message.request_id();
|
||||
substream.send_message(message);
|
||||
if let CustomMessageId::Request(request_id) = request_id {
|
||||
state.pending_response.push((request_id, substream));
|
||||
} else {
|
||||
state.shutdown.push(substream);
|
||||
}
|
||||
} else {
|
||||
debug!(target: "sub-libp2p", "Opened spurious outbound substream with {:?}",
|
||||
self.remote_peer_id);
|
||||
substream.shutdown();
|
||||
state.shutdown.push(substream);
|
||||
}
|
||||
|
||||
ProtocolState::Normal(state)
|
||||
ProtocolState::Normal { substream: existing, shutdown }
|
||||
}
|
||||
|
||||
ProtocolState::Disabled { mut shutdown, .. } => {
|
||||
@@ -730,54 +505,9 @@ where
|
||||
/// Sends a message to the remote.
|
||||
fn send_message(&mut self, message: TMessage) {
|
||||
match self.state {
|
||||
ProtocolState::BackCompat { ref mut substream, .. } =>
|
||||
ProtocolState::Normal { ref mut substream, .. } =>
|
||||
substream.send_message(message),
|
||||
|
||||
ProtocolState::Normal(ref mut state) => {
|
||||
if let CustomMessageId::Request(request_id) = message.request_id() {
|
||||
if let Some(mut outgoing_substream) = state.outgoing_substream.take() {
|
||||
outgoing_substream.send_message(message);
|
||||
state.pending_response.push((request_id, outgoing_substream));
|
||||
} else {
|
||||
if state.pending_messages.len() >= 2048 {
|
||||
let event = CustomProtoHandlerOut::Clogged {
|
||||
messages: Vec::new(),
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
}
|
||||
state.pending_messages.push(message);
|
||||
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(self.protocol.clone()),
|
||||
info: ()
|
||||
});
|
||||
}
|
||||
} else if let CustomMessageId::Response(request_id) = message.request_id() {
|
||||
if let Some(pos) = state.pending_send_back.iter().position(|(id, _)| *id == request_id) {
|
||||
let (_, mut substream) = state.pending_send_back.remove(pos);
|
||||
substream.send_message(message);
|
||||
state.shutdown.push(substream);
|
||||
} else {
|
||||
warn!(target: "sub-libp2p", "Libp2p layer received response to a \
|
||||
non-existing request ID {:?} with {:?}", request_id, self.remote_peer_id);
|
||||
}
|
||||
} else if let Some(mut outgoing_substream) = state.outgoing_substream.take() {
|
||||
outgoing_substream.send_message(message);
|
||||
state.shutdown.push(outgoing_substream);
|
||||
} else {
|
||||
if state.pending_messages.len() >= 2048 {
|
||||
let event = CustomProtoHandlerOut::Clogged {
|
||||
messages: Vec::new(),
|
||||
};
|
||||
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
|
||||
}
|
||||
state.pending_messages.push(message);
|
||||
self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest {
|
||||
protocol: SubstreamProtocol::new(self.protocol.clone()),
|
||||
info: ()
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
_ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \
|
||||
with {:?}", self.remote_peer_id)
|
||||
}
|
||||
@@ -844,8 +574,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage {
|
||||
|
||||
match self.state {
|
||||
ProtocolState::Init { .. } | ProtocolState::Opening { .. } => {}
|
||||
ProtocolState::BackCompat { .. } | ProtocolState::Normal { .. } =>
|
||||
keep_forever = true,
|
||||
ProtocolState::Normal { .. } => keep_forever = true,
|
||||
ProtocolState::Disabled { .. } | ProtocolState::Poisoned => return KeepAlive::No,
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
pub use self::behaviour::{CustomProto, CustomProtoOut};
|
||||
pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol};
|
||||
pub use self::upgrade::{CustomMessage, RegisteredProtocol};
|
||||
|
||||
mod behaviour;
|
||||
mod handler;
|
||||
|
||||
@@ -19,7 +19,7 @@ use bytes::Bytes;
|
||||
use libp2p::core::{Negotiated, Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName};
|
||||
use libp2p::tokio_codec::Framed;
|
||||
use log::warn;
|
||||
use std::{collections::VecDeque, io, iter, marker::PhantomData, vec::IntoIter as VecIntoIter};
|
||||
use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter};
|
||||
use futures::{prelude::*, future, stream};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use unsigned_varint::codec::UviBytes;
|
||||
@@ -100,9 +100,6 @@ pub struct RegisteredProtocolSubstream<TMessage, TSubstream> {
|
||||
/// If true, we have sent a "remote is clogged" event recently and shouldn't send another one
|
||||
/// unless the buffer empties then fills itself again.
|
||||
clogged_fuse: bool,
|
||||
/// If true, then this substream uses the "/multi/" version of the protocol. This is a hint
|
||||
/// that the handler can behave differently.
|
||||
is_multiplex: bool,
|
||||
/// Marker to pin the generic.
|
||||
marker: PhantomData<TMessage>,
|
||||
}
|
||||
@@ -126,12 +123,6 @@ impl<TMessage, TSubstream> RegisteredProtocolSubstream<TMessage, TSubstream> {
|
||||
self.endpoint
|
||||
}
|
||||
|
||||
/// Returns true if we negotiated the "multiplexed" version. This means that the handler can
|
||||
/// open multiple substreams instead of just one.
|
||||
pub fn is_multiplex(&self) -> bool {
|
||||
self.is_multiplex
|
||||
}
|
||||
|
||||
/// Starts a graceful shutdown process on this substream.
|
||||
///
|
||||
/// Note that "graceful" means that we sent a closing message. We don't wait for any
|
||||
@@ -162,31 +153,9 @@ pub trait CustomMessage {
|
||||
/// Tries to parse `bytes` received from the network into a message.
|
||||
fn from_bytes(bytes: &[u8]) -> Result<Self, ()>
|
||||
where Self: Sized;
|
||||
|
||||
/// Returns a unique ID that is used to match request and responses.
|
||||
///
|
||||
/// The networking layer employs multiplexing in order to have multiple parallel data streams.
|
||||
/// Transmitting messages over the network uses two kinds of substreams:
|
||||
///
|
||||
/// - Undirectional substreams, where we send a single message then close the substream.
|
||||
/// - Bidirectional substreams, where we send a message then wait for a response. Once the
|
||||
/// response has arrived, we close the substream.
|
||||
///
|
||||
/// If `request_id()` returns `OneWay`, then this message will be sent or received over a
|
||||
/// unidirectional substream. If instead it returns `Request` or `Response`, then we use the
|
||||
/// value to match a request with its response.
|
||||
fn request_id(&self) -> CustomMessageId;
|
||||
}
|
||||
|
||||
/// See the documentation of `CustomMessage::request_id`.
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||
pub enum CustomMessageId {
|
||||
OneWay,
|
||||
Request(u64),
|
||||
Response(u64),
|
||||
}
|
||||
|
||||
// These trait implementations exist mostly for testing convenience. This should eventually be
|
||||
// This trait implementation exist mostly for testing convenience. This should eventually be
|
||||
// removed.
|
||||
|
||||
impl CustomMessage for Vec<u8> {
|
||||
@@ -197,45 +166,6 @@ impl CustomMessage for Vec<u8> {
|
||||
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
|
||||
Ok(bytes.to_vec())
|
||||
}
|
||||
|
||||
fn request_id(&self) -> CustomMessageId {
|
||||
CustomMessageId::OneWay
|
||||
}
|
||||
}
|
||||
|
||||
impl CustomMessage for (Option<u64>, Vec<u8>) {
|
||||
fn into_bytes(self) -> Vec<u8> {
|
||||
use byteorder::WriteBytesExt;
|
||||
use std::io::Write;
|
||||
let mut out = Vec::new();
|
||||
out.write_u64::<byteorder::BigEndian>(self.0.unwrap_or(u64::max_value()))
|
||||
.expect("Writing to a Vec can never fail");
|
||||
out.write_all(&self.1).expect("Writing to a Vec can never fail");
|
||||
out
|
||||
}
|
||||
|
||||
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
|
||||
use byteorder::ReadBytesExt;
|
||||
use std::io::Read;
|
||||
let mut rdr = std::io::Cursor::new(bytes);
|
||||
let id = rdr.read_u64::<byteorder::BigEndian>().map_err(|_| ())?;
|
||||
let mut out = Vec::new();
|
||||
rdr.read_to_end(&mut out).map_err(|_| ())?;
|
||||
let id = if id == u64::max_value() {
|
||||
None
|
||||
} else {
|
||||
Some(id)
|
||||
};
|
||||
Ok((id, out))
|
||||
}
|
||||
|
||||
fn request_id(&self) -> CustomMessageId {
|
||||
if let Some(id) = self.0 {
|
||||
CustomMessageId::Request(id)
|
||||
} else {
|
||||
CustomMessageId::OneWay
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Event produced by the `RegisteredProtocolSubstream`.
|
||||
@@ -328,33 +258,15 @@ impl<TMessage> UpgradeInfo for RegisteredProtocol<TMessage> {
|
||||
#[inline]
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
// Report each version as an individual protocol.
|
||||
self.supported_versions.iter().flat_map(|&version| {
|
||||
self.supported_versions.iter().map(|&version| {
|
||||
let num = version.to_string();
|
||||
|
||||
// Note that `name1` is the multiplex version, as we priviledge it over the old one.
|
||||
let mut name1 = self.base_name.clone();
|
||||
name1.extend_from_slice(b"multi/");
|
||||
name1.extend_from_slice(num.as_bytes());
|
||||
let proto1 = RegisteredProtocolName {
|
||||
name: name1,
|
||||
let mut name = self.base_name.clone();
|
||||
name.extend_from_slice(num.as_bytes());
|
||||
RegisteredProtocolName {
|
||||
name,
|
||||
version,
|
||||
is_multiplex: true,
|
||||
};
|
||||
|
||||
let mut name2 = self.base_name.clone();
|
||||
name2.extend_from_slice(num.as_bytes());
|
||||
let proto2 = RegisteredProtocolName {
|
||||
name: name2,
|
||||
version,
|
||||
is_multiplex: false,
|
||||
};
|
||||
|
||||
// Important note: we prioritize the backwards compatible mode for now.
|
||||
// After some intensive testing has been done, we should switch to the new mode by
|
||||
// default.
|
||||
// Then finally we can remove the old mode after everyone has switched.
|
||||
// See https://github.com/paritytech/substrate/issues/1692
|
||||
iter::once(proto2).chain(iter::once(proto1))
|
||||
}
|
||||
}).collect::<Vec<_>>().into_iter()
|
||||
}
|
||||
}
|
||||
@@ -366,8 +278,6 @@ pub struct RegisteredProtocolName {
|
||||
name: Bytes,
|
||||
/// Version number. Stored in string form in `name`, but duplicated here for easier retrieval.
|
||||
version: u8,
|
||||
/// If true, then this version is the one with the multiplexing.
|
||||
is_multiplex: bool,
|
||||
}
|
||||
|
||||
impl ProtocolName for RegisteredProtocolName {
|
||||
@@ -403,7 +313,6 @@ where TSubstream: AsyncRead + AsyncWrite,
|
||||
protocol_id: self.id,
|
||||
protocol_version: info.version,
|
||||
clogged_fuse: false,
|
||||
is_multiplex: info.is_multiplex,
|
||||
marker: PhantomData,
|
||||
})
|
||||
}
|
||||
@@ -432,7 +341,6 @@ where TSubstream: AsyncRead + AsyncWrite,
|
||||
protocol_id: self.id,
|
||||
protocol_version: info.version,
|
||||
clogged_fuse: false,
|
||||
is_multiplex: info.is_multiplex,
|
||||
marker: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
//! Networking layer of Substrate.
|
||||
//!
|
||||
//!
|
||||
//! **Important**: This crate is unstable and the API and usage may change.
|
||||
//!
|
||||
|
||||
@@ -27,7 +27,7 @@ mod transport;
|
||||
|
||||
pub use crate::behaviour::Severity;
|
||||
pub use crate::config::*;
|
||||
pub use crate::custom_proto::{CustomMessage, CustomMessageId, RegisteredProtocol};
|
||||
pub use crate::custom_proto::{CustomMessage, RegisteredProtocol};
|
||||
pub use crate::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode};
|
||||
pub use crate::service_task::{start_service, Service, ServiceEvent};
|
||||
pub use libp2p::{Multiaddr, multiaddr, build_multiaddr};
|
||||
|
||||
@@ -200,7 +200,7 @@ fn many_nodes_connectivity() {
|
||||
#[test]
|
||||
fn basic_two_nodes_requests_in_parallel() {
|
||||
let (mut service1, mut service2) = {
|
||||
let mut l = build_nodes::<(Option<u64>, Vec<u8>)>(2, 50550).into_iter();
|
||||
let mut l = build_nodes::<Vec<u8>>(2, 50550).into_iter();
|
||||
let a = l.next().unwrap();
|
||||
let b = l.next().unwrap();
|
||||
(a, b)
|
||||
@@ -209,17 +209,8 @@ fn basic_two_nodes_requests_in_parallel() {
|
||||
// Generate random messages with or without a request id.
|
||||
let mut to_send = {
|
||||
let mut to_send = Vec::new();
|
||||
let mut next_id = 0;
|
||||
for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode.
|
||||
let id = if rand::random::<usize>() % 4 != 0 {
|
||||
let i = next_id;
|
||||
next_id += 1;
|
||||
Some(i)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let msg = (id, (0..10).map(|_| rand::random::<u8>()).collect::<Vec<_>>());
|
||||
let msg = (0..10).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
|
||||
to_send.push(msg);
|
||||
}
|
||||
to_send
|
||||
|
||||
@@ -125,7 +125,7 @@ pub struct RemoteReadResponse {
|
||||
/// Generic types.
|
||||
pub mod generic {
|
||||
use parity_codec::{Encode, Decode};
|
||||
use network_libp2p::{CustomMessage, CustomMessageId};
|
||||
use network_libp2p::CustomMessage;
|
||||
use runtime_primitives::Justification;
|
||||
use crate::config::Roles;
|
||||
use super::{
|
||||
@@ -213,26 +213,6 @@ pub mod generic {
|
||||
fn from_bytes(bytes: &[u8]) -> Result<Self, ()> {
|
||||
Decode::decode(&mut &bytes[..]).ok_or(())
|
||||
}
|
||||
|
||||
fn request_id(&self) -> CustomMessageId {
|
||||
match *self {
|
||||
Message::Status(_) => CustomMessageId::OneWay,
|
||||
Message::BlockRequest(ref req) => CustomMessageId::Request(req.id),
|
||||
Message::BlockResponse(ref resp) => CustomMessageId::Response(resp.id),
|
||||
Message::BlockAnnounce(_) => CustomMessageId::OneWay,
|
||||
Message::Transactions(_) => CustomMessageId::OneWay,
|
||||
Message::Consensus(_) => CustomMessageId::OneWay,
|
||||
Message::RemoteCallRequest(ref req) => CustomMessageId::Request(req.id),
|
||||
Message::RemoteCallResponse(ref resp) => CustomMessageId::Response(resp.id),
|
||||
Message::RemoteReadRequest(ref req) => CustomMessageId::Request(req.id),
|
||||
Message::RemoteReadResponse(ref resp) => CustomMessageId::Response(resp.id),
|
||||
Message::RemoteHeaderRequest(ref req) => CustomMessageId::Request(req.id),
|
||||
Message::RemoteHeaderResponse(ref resp) => CustomMessageId::Response(resp.id),
|
||||
Message::RemoteChangesRequest(ref req) => CustomMessageId::Request(req.id),
|
||||
Message::RemoteChangesResponse(ref resp) => CustomMessageId::Response(resp.id),
|
||||
Message::ChainSpecific(_) => CustomMessageId::OneWay,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Status sent on connection.
|
||||
|
||||
Reference in New Issue
Block a user