diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index d5f8498b5f..a6dbdf8571 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1624,7 +1624,7 @@ dependencies = [ "libp2p-core 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", - "yamux 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "yamux 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3938,6 +3938,7 @@ dependencies = [ name = "substrate-network-libp2p" version = "0.1.0" dependencies = [ + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4643,7 +4644,7 @@ name = "twox-hash" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -4950,7 +4951,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "yamux" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5355,4 +5356,4 @@ dependencies = [ "checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" "checksum xdg 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d089681aa106a86fade1b0128fb5daf07d5867a509ab036d99988dec80429a57" "checksum yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992" -"checksum yamux 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "e25561b512df3c287cf52404cab0b07ea43d095cb96230e9e2cb635db72d75f0" +"checksum yamux 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "56626765982b12c2f4b59529e1d2ce0a7c25499865e6edf8b502dceb51b65fe2" diff --git a/substrate/core/network-libp2p/Cargo.toml b/substrate/core/network-libp2p/Cargo.toml index 045487bcc9..ab784ac978 100644 --- a/substrate/core/network-libp2p/Cargo.toml +++ b/substrate/core/network-libp2p/Cargo.toml @@ -8,6 +8,7 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +byteorder = "1.3" bytes = "0.4" error-chain = { version = "0.12", default-features = false } fnv = "1.0" diff --git a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs index 6a69587eea..d946fe700e 100644 --- a/substrate/core/network-libp2p/src/custom_proto/behaviour.rs +++ b/substrate/core/network-libp2p/src/custom_proto/behaviour.rs @@ -22,7 +22,7 @@ use crate::parse_str_addr; use fnv::{FnvHashMap, FnvHashSet}; use futures::prelude::*; use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p::core::{protocols_handler::ProtocolsHandler, Multiaddr, PeerId}; +use libp2p::core::{protocols_handler::ProtocolsHandler, Endpoint, Multiaddr, PeerId}; use log::{debug, trace, warn}; use smallvec::SmallVec; use std::{cmp, error, io, marker::PhantomData, path::Path, time::Duration, time::Instant}; @@ -458,13 +458,13 @@ where trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (active)", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::EnableActive, + event: CustomProtosHandlerIn::Enable(Endpoint::Dialer), }); } else { trace!(target: "sub-libp2p", "Enabling custom protocols with {:?} (passive)", peer_id); self.events.push(NetworkBehaviourAction::SendEvent { peer_id: peer_id.clone(), - event: CustomProtosHandlerIn::EnablePassive, + event: CustomProtosHandlerIn::Enable(Endpoint::Listener), }); } @@ -581,10 +581,15 @@ where messages, })); } - CustomProtosHandlerOut::ProtocolError { protocol_id, error } => { - warn!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \ - {:?}: {:?}", source, protocol_id, error); - self.ban_peer(source); + CustomProtosHandlerOut::ProtocolError { protocol_id, error, is_severe } => { + if is_severe { + warn!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \ + {:?}: {:?}", source, protocol_id, error); + self.ban_peer(source); + } else { + debug!(target: "sub-libp2p", "Network misbehaviour from {:?} with protocol \ + {:?}: {:?}", source, protocol_id, error); + } } } } diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index 8feedc52e4..d7b8da5f74 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -15,71 +15,542 @@ // along with Substrate. If not, see . use crate::ProtocolId; -use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol, RegisteredProtocols}; -use crate::custom_proto::upgrade::{RegisteredProtocolSubstream, RegisteredProtocolEvent}; +use crate::custom_proto::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol, RegisteredProtocols}; +use crate::custom_proto::upgrade::{RegisteredProtocolEvent, RegisteredProtocolSubstream}; use futures::prelude::*; use libp2p::core::{ - ProtocolsHandler, ProtocolsHandlerEvent, + Endpoint, ProtocolsHandler, ProtocolsHandlerEvent, protocols_handler::KeepAlive, protocols_handler::ProtocolsHandlerUpgrErr, upgrade::{InboundUpgrade, OutboundUpgrade} }; -use log::trace; -use smallvec::SmallVec; -use std::{fmt, io, time::Duration, time::Instant}; +use log::{debug, error, warn}; +use smallvec::{smallvec, SmallVec}; +use std::{error, fmt, io, mem, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use void::Void; -/// Protocol handler that tries to maintain one substream per registered custom protocol. +/// Implements the `ProtocolsHandler` trait of libp2p. +/// +/// Every time a connection with a remote is established, an instance of this struct is created and +/// sent to a background task dedicated to this connection. It handles all communications that are +/// specific to Substrate. +/// +/// Note that there can be multiple instance of this struct simultaneously for same peer. However +/// if that happens, only one main instance can communicate with the outer layers of the code. +/// +/// ## How it works +/// +/// For backwards compatibility reasons, the behaviour of the handler is quite complicated. After +/// enough nodes have upgraded, it should be simplified by using helpers provided by libp2p. +/// +/// When the handler is created, it is initially in the `Init` state and waits for either a +/// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to +/// toggle the handler between the disabled and enabled states. +/// +/// When the handler is enabled for the first time, if it is the dialer of the connection, it tries +/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx` or a +/// protocol named `/substrate/multi/xxx`. If it is the former, then we are in +/// "backwards-compatibility mode". If it is the latter, we are in normal operation mode. +/// +/// In "backwards-compatibility mode", we have one unique substream where bidirectional +/// communications happen. If the remote closes the substream, we consider that we are now +/// disconnected. Re-enabling is performed by re-opening the substream. +/// +/// In normal operation mode, each request gets sent over a different substream where the response +/// is then sent back. If the remote refuses one of our substream open request, or if an error +/// happens on one substream, we consider that we are disconnected. Re-enabling is performed by +/// opening an outbound substream. /// -/// The handler initially starts in the "Disable" state. It can then be enabled by sending an -/// `Enable` message. -/// The handler can then be enabled and disabled at any time with the `Enable` and `Disable` -/// messages. pub struct CustomProtosHandler { - /// List of all the protocols we support. - protocols: RegisteredProtocols, - - /// See the documentation of `State`. - state: State, - - /// Value to be returned by `connection_keep_alive()`. - keep_alive: KeepAlive, - - /// The active substreams. There should always ever be only one substream per protocol. - substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, + /// Fields individual to each protocol that we support. + protocols: SmallVec<[PerProtocol; 1]>, /// Queue of events to send to the outside. + /// + /// This queue must only ever be modified to insert elements at the back, or remove the first + /// element. events_queue: SmallVec<[ProtocolsHandlerEvent, ProtocolId, CustomProtosHandlerOut>; 16]>, + + /// We have a warm-up period after creating the handler during which we don't shut down the + /// connection. + warm_up_end: Instant, } -/// State of the handler. -enum State { +/// Fields individual to each protocol that we support. +struct PerProtocol { + /// Configuration for the protocol upgrade to negotiate. + protocol: RegisteredProtocol, + + /// State of the communications with the remote. + state: PerProtocolState, +} + +/// State of the handler for a specific protocol. +enum PerProtocolState { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. - /// Contains a list of substreams opened by the remote and that we will integrate to - /// `substreams` only if we get enabled. + /// Contains a list of substreams opened by the remote but that haven't been processed yet. Init(SmallVec<[RegisteredProtocolSubstream; 6]>), - /// Normal functionning. - Normal, + /// Handler is opening a substream in order to activate itself. + /// If we are in this state, we haven't sent any `CustomProtocolOpen` yet. + Opening, - /// We are disabled. We close existing substreams and refuse incoming connections, but don't - /// shut down the entire handler. - Disabled, + /// Backwards-compatible mode. Contains the unique substream that is open. + /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. + BackCompat { + /// The unique substream where bidirectional communications happen. + substream: RegisteredProtocolSubstream, + /// Contains substreams which are being shut down. + shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, + }, - /// We are trying to shut down the existing node and thus should refuse any incoming - /// connection. - ShuttingDown, + /// Normal functionning. Contains the substreams that are open. + /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. + Normal(PerProtocolNormalState), + + /// We are disabled. Contains substreams that are being closed. + /// If we are in this state, either we have sent a `CustomProtocolClosed` message to the + /// outside or we have never sent any `CustomProtocolOpen` in the first place. + Disabled { + /// List of substreams to shut down. + shutdown: SmallVec<[RegisteredProtocolSubstream; 6]>, + + /// If true, we should reactivate the handler after all the substreams in `shutdown` have + /// been closed. + /// + /// Since we don't want to mix old and new substreams, we wait for all old substreams to + /// be closed before opening any new one. + reenable: bool, + }, + + /// We are trying to shut down the connection and thus should refuse any incoming connection. + /// Contains substreams that are being closed. Once all the substreams are closed, we close + /// the connection. + ShuttingDown(SmallVec<[RegisteredProtocolSubstream; 6]>), + + /// We sometimes temporarily switch to this state during processing. If we are in this state + /// at the beginning of a method, that means something bad happend in the source code. + Poisoned, +} + +/// Normal functionning mode for a protocol. +struct PerProtocolNormalState { + /// Optional substream that we opened. + outgoing_substream: Option>, + + /// Substreams that have been opened by the remote. We are waiting for a packet from it. + incoming_substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, + + /// For each request that has been sent to the remote, contains the substream where we + /// expect a response. + pending_response: SmallVec<[(u64, RegisteredProtocolSubstream); 4]>, + + /// For each request received by the remote, contains the substream where to send back our + /// response. Once a response has been sent, the substream closes. + pending_send_back: SmallVec<[(u64, RegisteredProtocolSubstream); 4]>, + + /// List of messages waiting for a substream to open in order to be sent. + pending_messages: SmallVec<[TMessage; 6]>, + + /// Contains substreams which are being shut down. + shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, +} + +impl PerProtocol +where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { + /// Enables the protocol. Returns an optional event to emit. + /// Must be passed the endpoint of the connection. + #[must_use] + fn enable(&mut self, endpoint: Endpoint) + -> Option, ProtocolId, CustomProtosHandlerOut>> { + + let return_value; + + self.state = match mem::replace(&mut self.state, PerProtocolState::Poisoned) { + PerProtocolState::Poisoned => { + error!(target: "sub-libp2p", "Handler is in poisoned state"); + return_value = None; + PerProtocolState::Poisoned + } + + PerProtocolState::Init(incoming) => { + if incoming.is_empty() { + if let Endpoint::Dialer = endpoint { + return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade: self.protocol.clone(), + info: self.protocol.id(), + }); + } else { + return_value = None; + } + PerProtocolState::Opening + + } else if incoming.iter().any(|s| s.is_multiplex()) { + let event = CustomProtosHandlerOut::CustomProtocolOpen { + protocol_id: self.protocol.id(), + version: incoming[0].protocol_version() + }; + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::Normal(PerProtocolNormalState { + outgoing_substream: None, + incoming_substreams: incoming.into_iter().collect(), + pending_response: SmallVec::new(), + pending_send_back: SmallVec::new(), + pending_messages: SmallVec::new(), + shutdown: SmallVec::new(), + }) + + } else { + let event = CustomProtosHandlerOut::CustomProtocolOpen { + protocol_id: self.protocol.id(), + version: incoming[0].protocol_version() + }; + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::BackCompat { + substream: incoming.into_iter().next() + .expect("We have a check above that incoming isn't empty; QED"), + shutdown: SmallVec::new() + } + } + } + + st @ PerProtocolState::Opening => { return_value = None; st } + st @ PerProtocolState::BackCompat { .. } => { return_value = None; st } + st @ PerProtocolState::Normal { .. } => { return_value = None; st } + PerProtocolState::Disabled { shutdown, .. } => { + return_value = None; + PerProtocolState::Disabled { shutdown, reenable: true } + } + PerProtocolState::ShuttingDown(list) => { + return_value = None; + PerProtocolState::ShuttingDown(list) + } + }; + + return_value + } + + /// Disables the protocol. Returns `true` if the protocol was closed, `false` if it was already + /// closed or not open yet. + fn disable(&mut self) -> bool { + let mut return_value = false; + + self.state = match mem::replace(&mut self.state, PerProtocolState::Poisoned) { + PerProtocolState::Poisoned => { + error!(target: "sub-libp2p", "Handler is in poisoned state"); + PerProtocolState::Poisoned + } + + PerProtocolState::Init(mut shutdown) => { + for s in &mut shutdown { + s.shutdown(); + } + PerProtocolState::Disabled { shutdown, reenable: false } + } + + PerProtocolState::Opening => { + PerProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false } + } + + PerProtocolState::BackCompat { mut substream, mut shutdown } => { + substream.shutdown(); + shutdown.push(substream); + return_value = true; + PerProtocolState::Disabled { + shutdown: shutdown.into_iter().collect(), + reenable: false + } + } + + PerProtocolState::Normal(state) => { + let mut out: SmallVec<[_; 6]> = SmallVec::new(); + out.extend(state.outgoing_substream.into_iter()); + out.extend(state.incoming_substreams.into_iter()); + out.extend(state.pending_response.into_iter().map(|(_, s)| s)); + out.extend(state.pending_send_back.into_iter().map(|(_, s)| s)); + for s in &mut out { + s.shutdown(); + } + out.extend(state.shutdown.into_iter()); + return_value = true; + PerProtocolState::Disabled { shutdown: out, reenable: false } + } + + PerProtocolState::Disabled { shutdown, .. } => + PerProtocolState::Disabled { shutdown, reenable: false }, + PerProtocolState::ShuttingDown(list) => + PerProtocolState::ShuttingDown(list), + }; + + return_value + } + + /// Shuts down all the substream. Returns `true` if the protocol was closed, `false` if it was + /// already closed or not open yet. + fn shutdown(&mut self) -> bool { + let mut return_value = false; + self.state = match mem::replace(&mut self.state, PerProtocolState::Poisoned) { + PerProtocolState::Poisoned => { + error!(target: "sub-libp2p", "Handler is in poisoned state"); + PerProtocolState::Poisoned + } + + PerProtocolState::Init(mut list) => { + for s in &mut list { s.shutdown(); } + PerProtocolState::ShuttingDown(list) + } + + PerProtocolState::Opening => { + PerProtocolState::ShuttingDown(SmallVec::new()) + } + + PerProtocolState::BackCompat { mut substream, mut shutdown } => { + substream.shutdown(); + shutdown.push(substream); + return_value = true; + PerProtocolState::ShuttingDown(shutdown.into_iter().collect()) + } + + PerProtocolState::Normal(state) => { + let mut out: SmallVec<[_; 6]> = SmallVec::new(); + out.extend(state.outgoing_substream.into_iter()); + out.extend(state.incoming_substreams.into_iter()); + out.extend(state.pending_response.into_iter().map(|(_, s)| s)); + out.extend(state.pending_send_back.into_iter().map(|(_, s)| s)); + for s in &mut out { + s.shutdown(); + } + out.extend(state.shutdown.into_iter()); + return_value = true; + PerProtocolState::ShuttingDown(out) + } + + PerProtocolState::Disabled { shutdown, .. } => + PerProtocolState::ShuttingDown(shutdown), + PerProtocolState::ShuttingDown(list) => + PerProtocolState::ShuttingDown(list), + }; + return_value + } + + /// Polls the state for events. Optionally returns an event to produce. + #[must_use] + fn poll(&mut self) + -> Option, ProtocolId, CustomProtosHandlerOut>> { + + let return_value; + self.state = match mem::replace(&mut self.state, PerProtocolState::Poisoned) { + PerProtocolState::Poisoned => { + error!(target: "sub-libp2p", "Handler is in poisoned state; shutting down"); + return_value = Some(ProtocolsHandlerEvent::Shutdown); + PerProtocolState::Poisoned + } + + st @ PerProtocolState::Init(_) => { + return_value = None; + st + } + + st @ PerProtocolState::Opening { .. } => { + return_value = None; + st + } + + PerProtocolState::BackCompat { mut substream, shutdown } => { + match substream.poll() { + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { + let event = CustomProtosHandlerOut::CustomMessage { + protocol_id: substream.protocol_id(), + message + }; + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::BackCompat { substream, shutdown } + }, + Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => { + let event = CustomProtosHandlerOut::Clogged { + protocol_id: substream.protocol_id(), + messages, + }; + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::BackCompat { substream, shutdown } + } + Ok(Async::NotReady) => { + return_value = None; + PerProtocolState::BackCompat { substream, shutdown } + } + Ok(Async::Ready(None)) => { + let event = CustomProtosHandlerOut::CustomProtocolClosed { + protocol_id: substream.protocol_id(), + result: Ok(()) + }; + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::Disabled { + shutdown: shutdown.into_iter().collect(), + reenable: false + } + } + Err(err) => { + let event = CustomProtosHandlerOut::CustomProtocolClosed { + protocol_id: substream.protocol_id(), + result: Err(err), + }; + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::Disabled { + shutdown: shutdown.into_iter().collect(), + reenable: false + } + } + } + } + + PerProtocolState::Normal(mut norm_state) => { + if let Some(event) = norm_state.poll(self.protocol.id()) { + return_value = Some(ProtocolsHandlerEvent::Custom(event)); + } else { + return_value = None; + } + + PerProtocolState::Normal(norm_state) + } + + PerProtocolState::Disabled { mut shutdown, reenable } => { + shutdown_list(&mut shutdown); + // If `reenable` is `true`, that means we should open the substreams system again + // after all the substreams are closed. + if reenable && shutdown.is_empty() { + return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade: self.protocol.clone(), + info: self.protocol.id(), + }); + PerProtocolState::Opening + } else { + return_value = None; + PerProtocolState::Disabled { shutdown, reenable } + } + } + + PerProtocolState::ShuttingDown(mut list) => { + shutdown_list(&mut list); + return_value = None; + PerProtocolState::ShuttingDown(list) + } + }; + + return_value + } +} + +impl PerProtocolNormalState +where TMessage: CustomMessage, TSubstream: AsyncRead + AsyncWrite { + /// Polls for things that are new. Same API constraints as `Future::poll()`. + /// Optionally returns the event to produce. + /// You must pass the `protocol_id` as we need have to inject it in the returned event. + /// API note: Ideally we wouldn't need to be passed a `ProtocolId`, and we would return a + /// different enum that doesn't contain any `protocol_id`, and the caller would inject + /// the ID itself, but that's a ton of code for not much gain. + fn poll(&mut self, protocol_id: ProtocolId) -> Option> { + for n in (0..self.pending_response.len()).rev() { + let (request_id, mut substream) = self.pending_response.swap_remove(n); + match substream.poll() { + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { + if message.request_id() == CustomMessageId::Response(request_id) { + let event = CustomProtosHandlerOut::CustomMessage { + protocol_id: substream.protocol_id(), + message + }; + self.shutdown.push(substream); + return Some(event); + } else { + self.shutdown.push(substream); + let event = CustomProtosHandlerOut::ProtocolError { + protocol_id, + is_severe: true, + error: format!("Request ID doesn't match substream: expected {:?}, \ + got {:?}", request_id, message.request_id()).into(), + }; + return Some(event); + } + }, + Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => + unreachable!("Cannot receive Clogged message with new protocol version; QED"), + Ok(Async::NotReady) => + self.pending_response.push((request_id, substream)), + Ok(Async::Ready(None)) => { + self.shutdown.push(substream); + let event = CustomProtosHandlerOut::ProtocolError { + protocol_id, + is_severe: false, + error: format!("Request ID {:?} didn't receive an answer", request_id).into(), + }; + return Some(event); + } + Err(err) => { + self.shutdown.push(substream); + let event = CustomProtosHandlerOut::ProtocolError { + protocol_id, + is_severe: false, + error: format!("Error while waiting for an answer for {:?}: {}", + request_id, err).into(), + }; + return Some(event); + } + } + } + + for n in (0..self.incoming_substreams.len()).rev() { + let mut substream = self.incoming_substreams.swap_remove(n); + match substream.poll() { + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { + let protocol_id = substream.protocol_id(); + if let CustomMessageId::Request(id) = message.request_id() { + self.pending_send_back.push((id, substream)); + return Some(CustomProtosHandlerOut::CustomMessage { + protocol_id, + message + }); + } else if let CustomMessageId::OneWay = message.request_id() { + self.shutdown.push(substream); + return Some(CustomProtosHandlerOut::CustomMessage { + protocol_id, + message + }); + } else { + self.shutdown.push(substream); + return Some(CustomProtosHandlerOut::ProtocolError { + protocol_id, + is_severe: true, + error: format!("Received response in new substream").into(), + }); + } + }, + Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { .. }))) => + unreachable!("Cannot receive Clogged message with new protocol version; QED"), + Ok(Async::NotReady) => + self.incoming_substreams.push(substream), + Ok(Async::Ready(None)) => {} + Err(err) => { + self.shutdown.push(substream); + return Some(CustomProtosHandlerOut::ProtocolError { + protocol_id, + is_severe: false, + error: format!("Error in incoming substream: {}", err).into(), + }); + } + } + } + + shutdown_list(&mut self.shutdown); + None + } } /// Event that can be received by a `CustomProtosHandler`. #[derive(Debug)] pub enum CustomProtosHandlerIn { - /// The node should start using custom protocols and actively open substreams. - EnableActive, - - /// The node should listen to custom protocols but not open substreams. - EnablePassive, + /// The node should start using custom protocols. Contains whether we are the dialer or the + /// listener of the connection. + Enable(Endpoint), /// The node should stop using custom protocols. Disable, @@ -133,59 +604,226 @@ pub enum CustomProtosHandlerOut { ProtocolError { /// Protocol for which the error happened. protocol_id: ProtocolId, + /// If true the error is severe, such as a protocol violation. + is_severe: bool, /// The error that happened. - error: ProtocolsHandlerUpgrErr, + error: Box, }, } impl CustomProtosHandler where TSubstream: AsyncRead + AsyncWrite, + TMessage: CustomMessage, { /// Builds a new `CustomProtosHandler`. pub fn new(protocols: RegisteredProtocols) -> Self { CustomProtosHandler { - protocols, - // We keep the connection alive for at least 5 seconds, waiting for what happens. - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(5)), - state: State::Init(SmallVec::new()), - substreams: SmallVec::new(), + protocols: protocols.0.into_iter().map(|protocol| { + PerProtocol { + protocol, + state: PerProtocolState::Init(SmallVec::new()), + } + }).collect(), events_queue: SmallVec::new(), + warm_up_end: Instant::now() + Duration::from_secs(5), + } + } + + /// Enables the handler for all protocols. + fn enable(&mut self, endpoint: Endpoint) { + for protocol in &mut self.protocols { + if let Some(message) = protocol.enable(endpoint) { + self.events_queue.push(message); + } + } + } + + /// Disables the handler for all protocols. + fn disable(&mut self) { + for protocol in &mut self.protocols { + if protocol.disable() { + let event = CustomProtosHandlerOut::CustomProtocolClosed { + protocol_id: protocol.protocol.id(), + result: Ok(()) + }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + } } } /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. fn inject_fully_negotiated( &mut self, - proto: RegisteredProtocolSubstream, + mut substream: RegisteredProtocolSubstream ) { - if self.substreams.iter().any(|p| p.protocol_id() == proto.protocol_id()) { - // Skipping protocol that's already open. - return - } - - match self.state { - State::Init(ref mut pending) => { - if pending.iter().all(|p| p.protocol_id() != proto.protocol_id()) { - pending.push(proto); - } + let state = match self.protocols.iter_mut().find(|p| p.protocol.id() == substream.protocol_id()) { + Some(p) => &mut p.state, + None => { + error!(target: "sub-libp2p", "Found unknown protocol ID {:?}", + substream.protocol_id()); return }, - // TODO: we should shut down refused substreams gracefully; this should be fixed - // at the same time as https://github.com/paritytech/substrate/issues/1517 - State::Disabled | State::ShuttingDown => return, - State::Normal => () - } - - let event = CustomProtosHandlerOut::CustomProtocolOpen { - protocol_id: proto.protocol_id(), - version: proto.protocol_version(), }; - self.keep_alive = KeepAlive::Forever; + *state = match mem::replace(state, PerProtocolState::Poisoned) { + PerProtocolState::Poisoned => { + error!(target: "sub-libp2p", "Handler is in poisoned state"); + PerProtocolState::Poisoned + } - self.substreams.push(proto); - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + PerProtocolState::Init(mut incoming) => { + if substream.endpoint() == Endpoint::Dialer { + error!(target: "sub-libp2p", "Opened dialing substream before initialization"); + } + incoming.push(substream); + PerProtocolState::Init(incoming) + } + + PerProtocolState::Opening => { + let event = CustomProtosHandlerOut::CustomProtocolOpen { + protocol_id: substream.protocol_id(), + version: substream.protocol_version() + }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + + match (substream.endpoint(), substream.is_multiplex()) { + (Endpoint::Dialer, true) => { + PerProtocolState::Normal(PerProtocolNormalState { + outgoing_substream: Some(substream), + incoming_substreams: SmallVec::new(), + pending_response: SmallVec::new(), + pending_send_back: SmallVec::new(), + pending_messages: SmallVec::new(), + shutdown: SmallVec::new(), + }) + }, + (Endpoint::Listener, true) => { + PerProtocolState::Normal(PerProtocolNormalState { + outgoing_substream: None, + incoming_substreams: smallvec![substream], + pending_response: SmallVec::new(), + pending_send_back: SmallVec::new(), + pending_messages: SmallVec::new(), + shutdown: SmallVec::new(), + }) + }, + (_, false) => { + PerProtocolState::BackCompat { + substream, + shutdown: SmallVec::new() + } + }, + } + } + + PerProtocolState::BackCompat { substream: existing, mut shutdown } => { + warn!(target: "sub-libp2p", "Received extra substream after having already one \ + open in backwards-compatibility mode"); + substream.shutdown(); + shutdown.push(substream); + PerProtocolState::BackCompat { substream: existing, shutdown } + } + + PerProtocolState::Normal(mut state) => { + if substream.endpoint() == Endpoint::Listener { + state.incoming_substreams.push(substream); + } else if !state.pending_messages.is_empty() { + let message = state.pending_messages.remove(0); + let request_id = message.request_id(); + substream.send_message(message); + if let CustomMessageId::Request(request_id) = request_id { + state.pending_response.push((request_id, substream)); + } else { + state.shutdown.push(substream); + } + } else { + debug!(target: "sub-libp2p", "Opened spurious outbound substream"); + substream.shutdown(); + state.shutdown.push(substream); + } + + PerProtocolState::Normal(state) + } + + PerProtocolState::Disabled { mut shutdown, .. } => { + substream.shutdown(); + shutdown.push(substream); + PerProtocolState::Disabled { shutdown, reenable: false } + } + + PerProtocolState::ShuttingDown(mut list) => { + substream.shutdown(); + list.push(substream); + PerProtocolState::ShuttingDown(list) + } + }; + } + + /// Sends a message to the remote. + fn send_message(&mut self, protocol: ProtocolId, message: TMessage) { + let (protocol, state) = match self.protocols.iter_mut().find(|p| p.protocol.id() == protocol) { + Some(p) => (&mut p.protocol, &mut p.state), + None => { + error!(target: "sub-libp2p", "Tried to send message over unknown protocol ID {:?}", + protocol); + return + }, + }; + + match *state { + PerProtocolState::BackCompat { ref mut substream, .. } => + substream.send_message(message), + + PerProtocolState::Normal(ref mut state) => { + if let CustomMessageId::Request(request_id) = message.request_id() { + if let Some(mut outgoing_substream) = state.outgoing_substream.take() { + outgoing_substream.send_message(message); + state.pending_response.push((request_id, outgoing_substream)); + } else { + if state.pending_messages.len() >= 2048 { + let event = CustomProtosHandlerOut::Clogged { + messages: Vec::new(), + protocol_id: protocol.id() + }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + } + state.pending_messages.push(message); + self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade: protocol.clone(), + info: protocol.id() + }); + } + } else if let CustomMessageId::Response(request_id) = message.request_id() { + if let Some(pos) = state.pending_send_back.iter().position(|(id, _)| *id == request_id) { + let (_, mut substream) = state.pending_send_back.remove(pos); + substream.send_message(message); + state.shutdown.push(substream); + } else { + warn!(target: "sub-libp2p", "Libp2p layer received response to a \ + non-existing request ID {:?}", request_id); + } + } else if let Some(mut outgoing_substream) = state.outgoing_substream.take() { + outgoing_substream.send_message(message); + state.shutdown.push(outgoing_substream); + } else { + if state.pending_messages.len() >= 2048 { + let event = CustomProtosHandlerOut::Clogged { + messages: Vec::new(), + protocol_id: protocol.id() + }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + } + state.pending_messages.push(message); + self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade: protocol.clone(), + info: protocol.id() + }); + } + } + + _ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol") + } } } @@ -201,7 +839,7 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { #[inline] fn listen_protocol(&self) -> Self::InboundProtocol { - self.protocols.clone() + RegisteredProtocols(self.protocols.iter().map(|p| p.protocol.clone()).collect()) } fn inject_fully_negotiated_inbound( @@ -222,69 +860,10 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { fn inject_event(&mut self, message: CustomProtosHandlerIn) { match message { - CustomProtosHandlerIn::Disable => { - match self.state { - State::Init(_) | State::Normal => self.state = State::Disabled, - State::Disabled | State::ShuttingDown => (), - } - - self.keep_alive = KeepAlive::Now; - for substream in self.substreams.iter_mut() { - substream.shutdown(); - } - }, - CustomProtosHandlerIn::EnableActive | CustomProtosHandlerIn::EnablePassive => { - match self.state { - State::Init(ref mut list) => { - for proto in list.drain() { - let event = CustomProtosHandlerOut::CustomProtocolOpen { - protocol_id: proto.protocol_id(), - version: proto.protocol_version(), - }; - - self.substreams.push(proto); - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - } - - self.state = State::Normal; - } - State::Disabled => self.state = State::Normal, - State::Normal | State::ShuttingDown => (), - } - - self.keep_alive = KeepAlive::Forever; - - // Try open one substream for each registered protocol. - if let CustomProtosHandlerIn::EnableActive = message { - for protocol in self.protocols.0.iter() { - if self.substreams.iter().any(|p| p.protocol_id() == protocol.id()) { - // Skipping protocol that's already open. - continue - } - - self.events_queue.push(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade: protocol.clone(), - info: protocol.id(), - }); - } - } - }, - CustomProtosHandlerIn::SendCustomMessage { protocol, message } => { - debug_assert!(self.protocols.has_protocol(protocol), - "invalid protocol id requested in the API of the libp2p networking"); - let proto = match self.substreams.iter_mut().find(|p| p.protocol_id() == protocol) { - Some(proto) => proto, - None => { - // We are processing a message event before we could report to the outside - // that we disconnected from the protocol. This is not an error. - trace!(target: "sub-libp2p", "Tried to send message through closed \ - protocol"); - return - }, - }; - - proto.send_message(message); - }, + CustomProtosHandlerIn::Disable => self.disable(), + CustomProtosHandlerIn::Enable(endpoint) => self.enable(endpoint), + CustomProtosHandlerIn::SendCustomMessage { protocol, message } => + self.send_message(protocol, message), } } @@ -292,31 +871,48 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { fn inject_inbound_closed(&mut self) {} #[inline] - fn inject_dial_upgrade_error(&mut self, protocol_id: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr) { - if let State::Normal = self.state { - let event = CustomProtosHandlerOut::ProtocolError { protocol_id, error }; - self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); - } + fn inject_dial_upgrade_error(&mut self, protocol_id: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr) { + let is_severe = match err { + ProtocolsHandlerUpgrErr::Upgrade(_) => true, + _ => false, + }; - // Right now if the remote doesn't support one of the custom protocols, we shut down the - // entire connection. This is a hack-ish solution to the problem where we connect to nodes - // that support libp2p but not the testnet that we want. - self.shutdown(); + self.events_queue.push(ProtocolsHandlerEvent::Custom(CustomProtosHandlerOut::ProtocolError { + protocol_id, + is_severe, + error: Box::new(err), + })); + + // If we failed to open a substream, there is little chance that we manage to open any + // other substream ever again on this connection, and thus we disable the handler. + self.disable(); } - #[inline] fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + if self.warm_up_end >= Instant::now() { + return KeepAlive::Until(self.warm_up_end) + } + + for protocol in self.protocols.iter() { + match protocol.state { + PerProtocolState::Disabled { .. } | PerProtocolState::ShuttingDown(_) | + PerProtocolState::Poisoned => return KeepAlive::Now, + _ => {} + } + } + + KeepAlive::Forever } fn shutdown(&mut self) { - match self.state { - State::Init(_) | State::Normal | State::Disabled => self.state = State::ShuttingDown, - State::ShuttingDown => (), - } - - for substream in self.substreams.iter_mut() { - substream.shutdown(); + for protocol in &mut self.protocols { + if protocol.shutdown() { + let event = CustomProtosHandlerOut::CustomProtocolClosed { + protocol_id: protocol.protocol.id(), + result: Ok(()) + }; + self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); + } } } @@ -326,61 +922,27 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { ProtocolsHandlerEvent, Self::Error, > { + // Flush the events queue if necessary. if !self.events_queue.is_empty() { let event = self.events_queue.remove(0); return Ok(Async::Ready(event)) } - if let State::ShuttingDown = self.state { - if self.substreams.is_empty() { - return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) + // Process all the substreams. + for protocol in self.protocols.iter_mut() { + if let Some(event) = protocol.poll() { + return Ok(Async::Ready(event)) } } - for n in (0..self.substreams.len()).rev() { - let mut substream = self.substreams.swap_remove(n); - match substream.poll() { - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { - let event = CustomProtosHandlerOut::CustomMessage { - protocol_id: substream.protocol_id(), - message - }; - self.substreams.push(substream); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) - }, - Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => { - let event = CustomProtosHandlerOut::Clogged { - protocol_id: substream.protocol_id(), - messages, - }; - self.substreams.push(substream); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) - }, - Ok(Async::NotReady) => - self.substreams.push(substream), - Ok(Async::Ready(None)) => { - // Close the connection as soon as possible. - if self.substreams.is_empty() { - self.keep_alive = KeepAlive::Now; - } - let event = CustomProtosHandlerOut::CustomProtocolClosed { - protocol_id: substream.protocol_id(), - result: Ok(()) - }; - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) - }, - Err(err) => { - // Close the connection as soon as possible. - if self.substreams.is_empty() { - self.keep_alive = KeepAlive::Now; - } - let event = CustomProtosHandlerOut::CustomProtocolClosed { - protocol_id: substream.protocol_id(), - result: Err(err) - }; - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(event))) - }, - } + // Shut down the node if everything is closed. + let can_shut_down = self.protocols.iter().all(|p| + match p.state { + PerProtocolState::ShuttingDown(ref list) if list.is_empty() => true, + _ => false + }); + if can_shut_down { + return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) } Ok(Async::NotReady) @@ -393,8 +955,24 @@ where { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { f.debug_struct("CustomProtosHandler") - .field("protocols", &self.protocols.len()) - .field("substreams", &self.substreams.len()) .finish() } } + +/// Given a list of substreams, tries to shut them down. The substreams that have been successfully +/// shut down are removed from the list. +fn shutdown_list + (list: &mut SmallVec>>) +where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { + 'outer: for n in (0..list.len()).rev() { + let mut substream = list.swap_remove(n); + loop { + match substream.poll() { + Ok(Async::Ready(Some(_))) => {} + Ok(Async::NotReady) => break, + Err(_) | Ok(Async::Ready(None)) => continue 'outer, + } + } + list.push(substream); + } +} diff --git a/substrate/core/network-libp2p/src/custom_proto/mod.rs b/substrate/core/network-libp2p/src/custom_proto/mod.rs index 8a0b55d5cc..073ce8360a 100644 --- a/substrate/core/network-libp2p/src/custom_proto/mod.rs +++ b/substrate/core/network-libp2p/src/custom_proto/mod.rs @@ -15,7 +15,7 @@ // along with Substrate. If not, see . pub use self::behaviour::{CustomProtos, CustomProtosOut}; -pub use self::upgrade::{CustomMessage, RegisteredProtocol, RegisteredProtocols}; +pub use self::upgrade::{CustomMessage, CustomMessageId, RegisteredProtocol, RegisteredProtocols}; mod behaviour; mod handler; diff --git a/substrate/core/network-libp2p/src/custom_proto/topology.rs b/substrate/core/network-libp2p/src/custom_proto/topology.rs index 84d77b55a2..4ad0e8d145 100644 --- a/substrate/core/network-libp2p/src/custom_proto/topology.rs +++ b/substrate/core/network-libp2p/src/custom_proto/topology.rs @@ -415,7 +415,12 @@ impl NetTopology { continue } - debug_assert!(!a.is_connected()); + // It is possible that we are connected to this address, and that the dial failure + // concerns another peer. + if a.is_connected() { + continue + } + a.adjust_score(SCORE_DIFF_ON_FAILED_TO_CONNECT); trace!(target: "sub-libp2p", "Back off for {} = {:?}", addr, a.next_back_off); a.back_off_until = Instant::now() + a.next_back_off; diff --git a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs index 3f31dc0a5a..292993957d 100644 --- a/substrate/core/network-libp2p/src/custom_proto/upgrade.rs +++ b/substrate/core/network-libp2p/src/custom_proto/upgrade.rs @@ -16,10 +16,10 @@ use crate::ProtocolId; use bytes::Bytes; -use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; +use libp2p::core::{Endpoint, UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade::ProtocolName}; use libp2p::tokio_codec::Framed; use log::warn; -use std::{collections::VecDeque, io, marker::PhantomData, vec::IntoIter as VecIntoIter}; +use std::{collections::VecDeque, io, iter, marker::PhantomData, vec::IntoIter as VecIntoIter}; use futures::{prelude::*, future, stream}; use tokio_io::{AsyncRead, AsyncWrite}; use unsigned_varint::codec::UviBytes; @@ -84,6 +84,9 @@ impl Clone for RegisteredProtocol { pub struct RegisteredProtocolSubstream { /// If true, we are in the process of closing the sink. is_closing: bool, + /// Whether the local node opened this substream (dialer), or we received this substream from + /// the remote (listener). + endpoint: Endpoint, /// Buffer of packets to send. send_queue: VecDeque>, /// If true, we should call `poll_complete` on the inner sink. @@ -97,6 +100,9 @@ pub struct RegisteredProtocolSubstream { /// If true, we have sent a "remote is clogged" event recently and shouldn't send another one /// unless the buffer empties then fills itself again. clogged_fuse: bool, + /// If true, then this substream uses the "/multi/" version of the protocol. This is a hint + /// that the handler can behave differently. + is_multiplex: bool, /// Marker to pin the generic. marker: PhantomData, } @@ -114,6 +120,18 @@ impl RegisteredProtocolSubstream { self.protocol_version } + /// Returns whether the local node opened this substream (dialer), or we received this + /// substream from the remote (listener). + pub fn endpoint(&self) -> Endpoint { + self.endpoint + } + + /// Returns true if we negotiated the "multiplexed" version. This means that the handler can + /// open multiple substreams instead of just one. + pub fn is_multiplex(&self) -> bool { + self.is_multiplex + } + /// Starts a graceful shutdown process on this substream. /// /// Note that "graceful" means that we sent a closing message. We don't wait for any @@ -138,14 +156,39 @@ impl RegisteredProtocolSubstream { /// Implemented on messages that can be sent or received on the network. pub trait CustomMessage { - /// Turns a message into raw bytes. + /// Turns a message into the raw bytes to send over the network. fn into_bytes(self) -> Vec; - /// Tries to part `bytes` into a message. + + /// Tries to parse `bytes` received from the network into a message. fn from_bytes(bytes: &[u8]) -> Result where Self: Sized; + + /// Returns a unique ID that is used to match request and responses. + /// + /// The networking layer employs multiplexing in order to have multiple parallel data streams. + /// Transmitting messages over the network uses two kinds of substreams: + /// + /// - Undirectional substreams, where we send a single message then close the substream. + /// - Bidirectional substreams, where we send a message then wait for a response. Once the + /// response has arrived, we close the substream. + /// + /// If `request_id()` returns `OneWay`, then this message will be sent or received over a + /// unidirectional substream. If instead it returns `Request` or `Response`, then we use the + /// value to match a request with its response. + fn request_id(&self) -> CustomMessageId; } -/// This trait implementation exists mostly for testing convenience. +/// See the documentation of `CustomMessage::request_id`. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CustomMessageId { + OneWay, + Request(u64), + Response(u64), +} + +// These trait implementations exist mostly for testing convenience. This should eventually be +// removed. + impl CustomMessage for Vec { fn into_bytes(self) -> Vec { self @@ -154,6 +197,45 @@ impl CustomMessage for Vec { fn from_bytes(bytes: &[u8]) -> Result { Ok(bytes.to_vec()) } + + fn request_id(&self) -> CustomMessageId { + CustomMessageId::OneWay + } +} + +impl CustomMessage for (Option, Vec) { + fn into_bytes(self) -> Vec { + use byteorder::WriteBytesExt; + use std::io::Write; + let mut out = Vec::new(); + out.write_u64::(self.0.unwrap_or(u64::max_value())) + .expect("Writing to a Vec can never fail"); + out.write_all(&self.1).expect("Writing to a Vec can never fail"); + out + } + + fn from_bytes(bytes: &[u8]) -> Result { + use byteorder::ReadBytesExt; + use std::io::Read; + let mut rdr = std::io::Cursor::new(bytes); + let id = rdr.read_u64::().map_err(|_| ())?; + let mut out = Vec::new(); + rdr.read_to_end(&mut out).map_err(|_| ())?; + let id = if id == u64::max_value() { + None + } else { + Some(id) + }; + Ok((id, out)) + } + + fn request_id(&self) -> CustomMessageId { + if let Some(id) = self.0 { + CustomMessageId::Request(id) + } else { + CustomMessageId::OneWay + } + } } /// Event produced by the `RegisteredProtocolSubstream`. @@ -176,11 +258,6 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { type Error = io::Error; fn poll(&mut self) -> Poll, Self::Error> { - // If we are closing, close as soon as the Sink is closed. - if self.is_closing { - return Ok(self.inner.close()?.map(|()| None)) - } - // Flushing the local queue. while let Some(packet) = self.send_queue.pop_front() { match self.inner.start_send(packet)? { @@ -192,6 +269,11 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { } } + // If we are closing, close as soon as the Sink is closed. + if self.is_closing { + return Ok(self.inner.close()?.map(|()| None)) + } + // Indicating that the remote is clogged if that's the case. if self.send_queue.len() >= 2048 { if !self.clogged_fuse { @@ -227,13 +309,13 @@ where TSubstream: AsyncRead + AsyncWrite, TMessage: CustomMessage { io::ErrorKind::InvalidData })?; Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) - }, + } Async::Ready(None) => if !self.requires_poll_complete && self.send_queue.is_empty() { Ok(Async::Ready(None)) } else { Ok(Async::NotReady) - }, + } Async::NotReady => Ok(Async::NotReady), } } @@ -246,14 +328,33 @@ impl UpgradeInfo for RegisteredProtocol { #[inline] fn protocol_info(&self) -> Self::InfoIter { // Report each version as an individual protocol. - self.supported_versions.iter().map(|&version| { + self.supported_versions.iter().flat_map(|&version| { let num = version.to_string(); - let mut name = self.base_name.clone(); - name.extend_from_slice(num.as_bytes()); - RegisteredProtocolName { - name, + + // Note that `name1` is the multiplex version, as we priviledge it over the old one. + let mut name1 = self.base_name.clone(); + name1.extend_from_slice(b"multi/"); + name1.extend_from_slice(num.as_bytes()); + let proto1 = RegisteredProtocolName { + name: name1, version, - } + is_multiplex: true, + }; + + let mut name2 = self.base_name.clone(); + name2.extend_from_slice(num.as_bytes()); + let proto2 = RegisteredProtocolName { + name: name2, + version, + is_multiplex: false, + }; + + // Important note: we prioritize the backwards compatible mode for now. + // After some intensive testing has been done, we should switch to the new mode by + // default. + // Then finally we can remove the old mode after everyone has switched. + // See https://github.com/paritytech/substrate/issues/1692 + iter::once(proto2).chain(iter::once(proto1)) }).collect::>().into_iter() } } @@ -265,6 +366,8 @@ pub struct RegisteredProtocolName { name: Bytes, /// Version number. Stored in string form in `name`, but duplicated here for easier retrieval. version: u8, + /// If true, then this version is the one with the multiplexing. + is_multiplex: bool, } impl ProtocolName for RegisteredProtocolName { @@ -289,12 +392,14 @@ where TSubstream: AsyncRead + AsyncWrite, future::ok(RegisteredProtocolSubstream { is_closing: false, + endpoint: Endpoint::Listener, send_queue: VecDeque::new(), requires_poll_complete: false, inner: framed.fuse(), protocol_id: self.id, protocol_version: info.version, clogged_fuse: false, + is_multiplex: info.is_multiplex, marker: PhantomData, }) } @@ -312,8 +417,20 @@ where TSubstream: AsyncRead + AsyncWrite, socket: TSubstream, info: Self::Info, ) -> Self::Future { - // Upgrades are symmetrical. - self.upgrade_inbound(socket, info) + let framed = Framed::new(socket, UviBytes::default()); + + future::ok(RegisteredProtocolSubstream { + is_closing: false, + endpoint: Endpoint::Dialer, + send_queue: VecDeque::new(), + requires_poll_complete: false, + inner: framed.fuse(), + protocol_id: self.id, + protocol_version: info.version, + clogged_fuse: false, + is_multiplex: info.is_multiplex, + marker: PhantomData, + }) } } @@ -326,11 +443,6 @@ impl RegisteredProtocols { pub fn len(&self) -> usize { self.0.len() } - - /// Returns true if the given protocol is in the list. - pub fn has_protocol(&self, protocol: ProtocolId) -> bool { - self.0.iter().any(|p| p.id == protocol) - } } impl Default for RegisteredProtocols { diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 1f498e22fd..a6f66c6026 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -24,7 +24,7 @@ mod service_task; mod traits; mod transport; -pub use crate::custom_proto::{CustomMessage, RegisteredProtocol}; +pub use crate::custom_proto::{CustomMessage, CustomMessageId, RegisteredProtocol}; pub use crate::error::{Error, ErrorKind, DisconnectReason}; pub use crate::secret::obtain_private_key; pub use crate::service_task::{start_service, Service, ServiceEvent}; diff --git a/substrate/core/network-libp2p/tests/test.rs b/substrate/core/network-libp2p/tests/test.rs index d9c6dbe2f1..40b9598f54 100644 --- a/substrate/core/network-libp2p/tests/test.rs +++ b/substrate/core/network-libp2p/tests/test.rs @@ -15,6 +15,7 @@ // along with Substrate. If not, see . use futures::{future, stream, prelude::*, try_ready}; +use rand::seq::SliceRandom; use std::{io, iter}; use substrate_network_libp2p::{CustomMessage, Protocol, ServiceEvent, build_multiaddr}; @@ -85,7 +86,10 @@ fn basic_two_nodes_connectivity() { fn two_nodes_transfer_lots_of_packets() { // We spawn two nodes, then make the first one send lots of packets to the second one. The test // ends when the second one has received all of them. - const NUM_PACKETS: u32 = 20000; + + // Note that if we go too high, we will reach the limit to the number of simultaneous + // substreams allowed by the multiplexer. + const NUM_PACKETS: u32 = 5000; let (mut service1, mut service2) = { let mut l = build_nodes::>(2).into_iter(); @@ -114,7 +118,6 @@ fn two_nodes_transfer_lots_of_packets() { Some(ServiceEvent::OpenedCustomProtocol { .. }) => {}, Some(ServiceEvent::CustomMessage { message, .. }) => { assert_eq!(message.len(), 1); - assert_eq!(u32::from(message[0]), packet_counter % 256); packet_counter += 1; if packet_counter == NUM_PACKETS { return Ok(Async::Ready(())) @@ -189,3 +192,69 @@ fn many_nodes_connectivity() { tokio::runtime::Runtime::new().unwrap().block_on(combined).unwrap(); } + +#[test] +fn basic_two_nodes_requests_in_parallel() { + let (mut service1, mut service2) = { + let mut l = build_nodes::<(Option, Vec)>(2).into_iter(); + let a = l.next().unwrap(); + let b = l.next().unwrap(); + (a, b) + }; + + // Generate random messages with or without a request id. + let mut to_send = { + let mut to_send = Vec::new(); + let mut next_id = 0; + for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. + let id = if rand::random::() % 4 != 0 { + let i = next_id; + next_id += 1; + Some(i) + } else { + None + }; + + let msg = (id, (0..10).map(|_| rand::random::()).collect::>()); + to_send.push(msg); + } + to_send + }; + + // Clone `to_send` in `to_receive`. Below we will remove from `to_receive` the messages we + // receive, until the list is empty. + let mut to_receive = to_send.clone(); + to_send.shuffle(&mut rand::thread_rng()); + + let fut1 = future::poll_fn(move || -> io::Result<_> { + loop { + match try_ready!(service1.poll()) { + Some(ServiceEvent::OpenedCustomProtocol { node_index, protocol, .. }) => { + for msg in to_send.drain(..) { + service1.send_custom_message(node_index, protocol, msg); + } + }, + _ => panic!(), + } + } + }); + + let fut2 = future::poll_fn(move || -> io::Result<_> { + loop { + match try_ready!(service2.poll()) { + Some(ServiceEvent::OpenedCustomProtocol { .. }) => {}, + Some(ServiceEvent::CustomMessage { message, .. }) => { + let pos = to_receive.iter().position(|m| *m == message).unwrap(); + to_receive.remove(pos); + if to_receive.is_empty() { + return Ok(Async::Ready(())) + } + } + _ => panic!(), + } + } + }); + + let combined = fut1.select(fut2).map_err(|(err, _)| err); + tokio::runtime::Runtime::new().unwrap().block_on_all(combined).unwrap(); +} diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs index b54032d317..416574e387 100644 --- a/substrate/core/network/src/message.rs +++ b/substrate/core/network/src/message.rs @@ -129,7 +129,7 @@ pub struct RemoteReadResponse { /// Generic types. pub mod generic { use parity_codec::{Encode, Decode}; - use network_libp2p::CustomMessage; + use network_libp2p::{CustomMessage, CustomMessageId}; use runtime_primitives::Justification; use parity_codec_derive::{Encode, Decode}; use crate::config::Roles; @@ -218,6 +218,26 @@ pub mod generic { fn from_bytes(bytes: &[u8]) -> Result { Decode::decode(&mut &bytes[..]).ok_or(()) } + + fn request_id(&self) -> CustomMessageId { + match *self { + Message::Status(_) => CustomMessageId::OneWay, + Message::BlockRequest(ref req) => CustomMessageId::Request(req.id), + Message::BlockResponse(ref resp) => CustomMessageId::Response(resp.id), + Message::BlockAnnounce(_) => CustomMessageId::OneWay, + Message::Transactions(_) => CustomMessageId::OneWay, + Message::Consensus(_) => CustomMessageId::OneWay, + Message::RemoteCallRequest(ref req) => CustomMessageId::Request(req.id), + Message::RemoteCallResponse(ref resp) => CustomMessageId::Response(resp.id), + Message::RemoteReadRequest(ref req) => CustomMessageId::Request(req.id), + Message::RemoteReadResponse(ref resp) => CustomMessageId::Response(resp.id), + Message::RemoteHeaderRequest(ref req) => CustomMessageId::Request(req.id), + Message::RemoteHeaderResponse(ref resp) => CustomMessageId::Response(resp.id), + Message::RemoteChangesRequest(ref req) => CustomMessageId::Request(req.id), + Message::RemoteChangesResponse(ref resp) => CustomMessageId::Response(resp.id), + Message::ChainSpecific(_) => CustomMessageId::OneWay, + } + } } /// Status sent on connection.