diff --git a/substrate/core/network-libp2p/src/custom_proto/handler.rs b/substrate/core/network-libp2p/src/custom_proto/handler.rs index 6794a9b12c..ee2ac22c5e 100644 --- a/substrate/core/network-libp2p/src/custom_proto/handler.rs +++ b/substrate/core/network-libp2p/src/custom_proto/handler.rs @@ -26,7 +26,7 @@ use libp2p::core::{ upgrade::{InboundUpgrade, OutboundUpgrade} }; use log::{debug, error}; -use smallvec::SmallVec; +use smallvec::{smallvec, SmallVec}; use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; @@ -73,13 +73,18 @@ use void::Void; /// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to /// toggle the handler between the disabled and enabled states. /// -/// When the handler is enabled for the first time, if it is the dialer of the connection, it tries -/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx`, where -/// `xxx` is chosen by the user. +/// When the handler switches to "enabled", it opens a substream and negotiates the protocol named +/// `/substrate/xxx`, where `xxx` is chosen by the user and depends on the chain. /// -/// Then, we have one unique substream where bidirectional communications happen. If the remote -/// closes the substream, we consider that we are now disconnected. Re-enabling is performed by -/// re-opening the substream (even if we are not the dialer of the connection). +/// For backwards compatibility reasons, when we switch to "enabled" for the first time (while we +/// are still in "init" mode) and we are the connection listener, we don't open a substream. +/// +/// In order the handle the situation where both the remote and us get enabled at the same time, +/// we tolerate multiple substreams open at the same time. Messages are transmitted on an arbitrary +/// substream. The endpoints don't try to agree on a single substream. +/// +/// We consider that we are now "closed" if the remote closes all the existing substreams. +/// Re-opening it can then be performed by closing all active substream and re-opening one. /// pub struct CustomProtoHandlerProto { /// Configuration for the protocol upgrade to negotiate. @@ -169,11 +174,11 @@ enum ProtocolState { deadline: Delay, }, - /// Backwards-compatible mode. Contains the unique substream that is open. + /// Normal operating mode. Contains the substreams that are open. /// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside. Normal { - /// The unique substream where bidirectional communications happen. - substream: RegisteredProtocolSubstream, + /// The substreams where bidirectional communications happen. + substreams: SmallVec<[RegisteredProtocolSubstream; 4]>, /// Contains substreams which are being shut down. shutdown: SmallVec<[RegisteredProtocolSubstream; 4]>, }, @@ -284,8 +289,7 @@ where }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { - substream: incoming.into_iter().next() - .expect("We have a check above that incoming isn't empty; QED"), + substreams: incoming.into_iter().collect(), shutdown: SmallVec::new() } } @@ -319,9 +323,11 @@ where ProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false } } - ProtocolState::Normal { mut substream, mut shutdown } => { - substream.shutdown(); - shutdown.push(substream); + ProtocolState::Normal { substreams, mut shutdown } => { + for mut substream in substreams { + substream.shutdown(); + shutdown.push(substream); + } let event = CustomProtoHandlerOut::CustomProtocolClosed { result: Ok(()) }; @@ -341,13 +347,12 @@ where #[must_use] fn poll_state(&mut self) -> Option, (), CustomProtoHandlerOut>> { - let return_value; - self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { + match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state", self.remote_peer_id); - return_value = None; - ProtocolState::Poisoned + self.state = ProtocolState::Poisoned; + None } ProtocolState::Init { substreams, mut init_deadline } => { @@ -361,8 +366,8 @@ where Err(_) => error!(target: "sub-libp2p", "Tokio timer has errored") } - return_value = None; - ProtocolState::Init { substreams, init_deadline } + self.state = ProtocolState::Init { substreams, init_deadline }; + None } ProtocolState::Opening { mut deadline } => { @@ -373,63 +378,74 @@ where is_severe: true, error: "Timeout when opening protocol".to_string().into(), }; - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::Opening { deadline } + self.state = ProtocolState::Opening { deadline }; + Some(ProtocolsHandlerEvent::Custom(event)) }, Ok(Async::NotReady) => { - return_value = None; - ProtocolState::Opening { deadline } + self.state = ProtocolState::Opening { deadline }; + None }, Err(_) => { error!(target: "sub-libp2p", "Tokio timer has errored"); deadline.reset(self.clock.now() + Duration::from_secs(60)); - return_value = None; - ProtocolState::Opening { deadline } + self.state = ProtocolState::Opening { deadline }; + None }, } } - ProtocolState::Normal { mut substream, shutdown } => { - match substream.poll() { - Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { - let event = CustomProtoHandlerOut::CustomMessage { - message - }; - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::Normal { substream, shutdown } - }, - Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => { - let event = CustomProtoHandlerOut::Clogged { - messages, - }; - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::Normal { substream, shutdown } - } - Ok(Async::NotReady) => { - return_value = None; - ProtocolState::Normal { substream, shutdown } - } - Ok(Async::Ready(None)) => { - let event = CustomProtoHandlerOut::CustomProtocolClosed { - result: Ok(()) - }; - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::Disabled { - shutdown: shutdown.into_iter().collect(), - reenable: true + ProtocolState::Normal { mut substreams, shutdown } => { + for n in (0..substreams.len()).rev() { + let mut substream = substreams.swap_remove(n); + match substream.poll() { + Ok(Async::NotReady) => substreams.push(substream), + Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => { + let event = CustomProtoHandlerOut::CustomMessage { + message + }; + substreams.push(substream); + self.state = ProtocolState::Normal { substreams, shutdown }; + return Some(ProtocolsHandlerEvent::Custom(event)); + }, + Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => { + let event = CustomProtoHandlerOut::Clogged { + messages, + }; + substreams.push(substream); + self.state = ProtocolState::Normal { substreams, shutdown }; + return Some(ProtocolsHandlerEvent::Custom(event)); } - } - Err(err) => { - let event = CustomProtoHandlerOut::CustomProtocolClosed { - result: Err(err), - }; - return_value = Some(ProtocolsHandlerEvent::Custom(event)); - ProtocolState::Disabled { - shutdown: shutdown.into_iter().collect(), - reenable: true + Ok(Async::Ready(None)) => { + let event = CustomProtoHandlerOut::CustomProtocolClosed { + result: Ok(()) + }; + substreams.push(substream); + self.state = ProtocolState::Disabled { + shutdown: shutdown.into_iter().collect(), + reenable: true + }; + return Some(ProtocolsHandlerEvent::Custom(event)); + } + Err(err) => { + if substreams.is_empty() { + let event = CustomProtoHandlerOut::CustomProtocolClosed { + result: Err(err), + }; + self.state = ProtocolState::Disabled { + shutdown: shutdown.into_iter().collect(), + reenable: true + }; + return Some(ProtocolsHandlerEvent::Custom(event)); + } else { + debug!(target: "sub-libp2p", "Error on extra substream: {:?}", err); + } } } } + + // This code is reached is none if and only if none of the substreams are in a ready state. + self.state = ProtocolState::Normal { substreams, shutdown }; + None } ProtocolState::Disabled { mut shutdown, reenable } => { @@ -437,21 +453,19 @@ where // If `reenable` is `true`, that means we should open the substreams system again // after all the substreams are closed. if reenable && shutdown.is_empty() { - return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { + self.state = ProtocolState::Opening { + deadline: Delay::new(self.clock.now() + Duration::from_secs(60)) + }; + Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(self.protocol.clone()), info: (), - }); - ProtocolState::Opening { - deadline: Delay::new(self.clock.now() + Duration::from_secs(60)) - } + }) } else { - return_value = None; - ProtocolState::Disabled { shutdown, reenable } + self.state = ProtocolState::Disabled { shutdown, reenable }; + None } } - }; - - return_value + } } /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. @@ -481,17 +495,14 @@ where }; self.events_queue.push(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { - substream, + substreams: smallvec![substream], shutdown: SmallVec::new() } } - ProtocolState::Normal { substream: existing, mut shutdown } => { - debug!(target: "sub-libp2p", "Received extra substream after having already one \ - open in backwards-compatibility mode with {:?}", self.remote_peer_id); - substream.shutdown(); - shutdown.push(substream); - ProtocolState::Normal { substream: existing, shutdown } + ProtocolState::Normal { substreams: mut existing, shutdown } => { + existing.push(substream); + ProtocolState::Normal { substreams: existing, shutdown } } ProtocolState::Disabled { mut shutdown, .. } => { @@ -505,8 +516,8 @@ where /// Sends a message to the remote. fn send_message(&mut self, message: TMessage) { match self.state { - ProtocolState::Normal { ref mut substream, .. } => - substream.send_message(message), + ProtocolState::Normal { ref mut substreams, .. } => + substreams[0].send_message(message), _ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \ with {:?}", self.remote_peer_id)