Allow multiple substreams (#2379)

* Allow multiple substreams

* Update core/network-libp2p/src/custom_proto/handler.rs

Co-Authored-By: tomaka <pierre.krieger1708@gmail.com>
This commit is contained in:
Pierre Krieger
2019-04-27 16:33:42 +02:00
committed by Gavin Wood
parent 2b10d8e080
commit 48d53a35b7
@@ -26,7 +26,7 @@ use libp2p::core::{
upgrade::{InboundUpgrade, OutboundUpgrade}
};
use log::{debug, error};
use smallvec::SmallVec;
use smallvec::{smallvec, SmallVec};
use std::{error, fmt, io, marker::PhantomData, mem, time::Duration, time::Instant};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::{Delay, clock::Clock};
@@ -73,13 +73,18 @@ use void::Void;
/// `Disable` or an `Enable` message from the outer layer. At any time, the outer layer is free to
/// toggle the handler between the disabled and enabled states.
///
/// When the handler is enabled for the first time, if it is the dialer of the connection, it tries
/// to open a substream. The substream negotiates either a protocol named `/substrate/xxx`, where
/// `xxx` is chosen by the user.
/// When the handler switches to "enabled", it opens a substream and negotiates the protocol named
/// `/substrate/xxx`, where `xxx` is chosen by the user and depends on the chain.
///
/// Then, we have one unique substream where bidirectional communications happen. If the remote
/// closes the substream, we consider that we are now disconnected. Re-enabling is performed by
/// re-opening the substream (even if we are not the dialer of the connection).
/// For backwards compatibility reasons, when we switch to "enabled" for the first time (while we
/// are still in "init" mode) and we are the connection listener, we don't open a substream.
///
/// In order the handle the situation where both the remote and us get enabled at the same time,
/// we tolerate multiple substreams open at the same time. Messages are transmitted on an arbitrary
/// substream. The endpoints don't try to agree on a single substream.
///
/// We consider that we are now "closed" if the remote closes all the existing substreams.
/// Re-opening it can then be performed by closing all active substream and re-opening one.
///
pub struct CustomProtoHandlerProto<TMessage, TSubstream> {
/// Configuration for the protocol upgrade to negotiate.
@@ -169,11 +174,11 @@ enum ProtocolState<TMessage, TSubstream> {
deadline: Delay,
},
/// Backwards-compatible mode. Contains the unique substream that is open.
/// Normal operating mode. Contains the substreams that are open.
/// If we are in this state, we have sent a `CustomProtocolOpen` message to the outside.
Normal {
/// The unique substream where bidirectional communications happen.
substream: RegisteredProtocolSubstream<TMessage, TSubstream>,
/// The substreams where bidirectional communications happen.
substreams: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
/// Contains substreams which are being shut down.
shutdown: SmallVec<[RegisteredProtocolSubstream<TMessage, TSubstream>; 4]>,
},
@@ -284,8 +289,7 @@ where
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substream: incoming.into_iter().next()
.expect("We have a check above that incoming isn't empty; QED"),
substreams: incoming.into_iter().collect(),
shutdown: SmallVec::new()
}
}
@@ -319,9 +323,11 @@ where
ProtocolState::Disabled { shutdown: SmallVec::new(), reenable: false }
}
ProtocolState::Normal { mut substream, mut shutdown } => {
substream.shutdown();
shutdown.push(substream);
ProtocolState::Normal { substreams, mut shutdown } => {
for mut substream in substreams {
substream.shutdown();
shutdown.push(substream);
}
let event = CustomProtoHandlerOut::CustomProtocolClosed {
result: Ok(())
};
@@ -341,13 +347,12 @@ where
#[must_use]
fn poll_state(&mut self)
-> Option<ProtocolsHandlerEvent<RegisteredProtocol<TMessage>, (), CustomProtoHandlerOut<TMessage>>> {
let return_value;
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
error!(target: "sub-libp2p", "Handler with {:?} is in poisoned state",
self.remote_peer_id);
return_value = None;
ProtocolState::Poisoned
self.state = ProtocolState::Poisoned;
None
}
ProtocolState::Init { substreams, mut init_deadline } => {
@@ -361,8 +366,8 @@ where
Err(_) => error!(target: "sub-libp2p", "Tokio timer has errored")
}
return_value = None;
ProtocolState::Init { substreams, init_deadline }
self.state = ProtocolState::Init { substreams, init_deadline };
None
}
ProtocolState::Opening { mut deadline } => {
@@ -373,63 +378,74 @@ where
is_severe: true,
error: "Timeout when opening protocol".to_string().into(),
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Opening { deadline }
self.state = ProtocolState::Opening { deadline };
Some(ProtocolsHandlerEvent::Custom(event))
},
Ok(Async::NotReady) => {
return_value = None;
ProtocolState::Opening { deadline }
self.state = ProtocolState::Opening { deadline };
None
},
Err(_) => {
error!(target: "sub-libp2p", "Tokio timer has errored");
deadline.reset(self.clock.now() + Duration::from_secs(60));
return_value = None;
ProtocolState::Opening { deadline }
self.state = ProtocolState::Opening { deadline };
None
},
}
}
ProtocolState::Normal { mut substream, shutdown } => {
match substream.poll() {
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
let event = CustomProtoHandlerOut::CustomMessage {
message
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal { substream, shutdown }
},
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => {
let event = CustomProtoHandlerOut::Clogged {
messages,
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal { substream, shutdown }
}
Ok(Async::NotReady) => {
return_value = None;
ProtocolState::Normal { substream, shutdown }
}
Ok(Async::Ready(None)) => {
let event = CustomProtoHandlerOut::CustomProtocolClosed {
result: Ok(())
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
ProtocolState::Normal { mut substreams, shutdown } => {
for n in (0..substreams.len()).rev() {
let mut substream = substreams.swap_remove(n);
match substream.poll() {
Ok(Async::NotReady) => substreams.push(substream),
Ok(Async::Ready(Some(RegisteredProtocolEvent::Message(message)))) => {
let event = CustomProtoHandlerOut::CustomMessage {
message
};
substreams.push(substream);
self.state = ProtocolState::Normal { substreams, shutdown };
return Some(ProtocolsHandlerEvent::Custom(event));
},
Ok(Async::Ready(Some(RegisteredProtocolEvent::Clogged { messages }))) => {
let event = CustomProtoHandlerOut::Clogged {
messages,
};
substreams.push(substream);
self.state = ProtocolState::Normal { substreams, shutdown };
return Some(ProtocolsHandlerEvent::Custom(event));
}
}
Err(err) => {
let event = CustomProtoHandlerOut::CustomProtocolClosed {
result: Err(err),
};
return_value = Some(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
Ok(Async::Ready(None)) => {
let event = CustomProtoHandlerOut::CustomProtocolClosed {
result: Ok(())
};
substreams.push(substream);
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
};
return Some(ProtocolsHandlerEvent::Custom(event));
}
Err(err) => {
if substreams.is_empty() {
let event = CustomProtoHandlerOut::CustomProtocolClosed {
result: Err(err),
};
self.state = ProtocolState::Disabled {
shutdown: shutdown.into_iter().collect(),
reenable: true
};
return Some(ProtocolsHandlerEvent::Custom(event));
} else {
debug!(target: "sub-libp2p", "Error on extra substream: {:?}", err);
}
}
}
}
// This code is reached is none if and only if none of the substreams are in a ready state.
self.state = ProtocolState::Normal { substreams, shutdown };
None
}
ProtocolState::Disabled { mut shutdown, reenable } => {
@@ -437,21 +453,19 @@ where
// If `reenable` is `true`, that means we should open the substreams system again
// after all the substreams are closed.
if reenable && shutdown.is_empty() {
return_value = Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
self.state = ProtocolState::Opening {
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
};
Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.protocol.clone()),
info: (),
});
ProtocolState::Opening {
deadline: Delay::new(self.clock.now() + Duration::from_secs(60))
}
})
} else {
return_value = None;
ProtocolState::Disabled { shutdown, reenable }
self.state = ProtocolState::Disabled { shutdown, reenable };
None
}
}
};
return_value
}
}
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
@@ -481,17 +495,14 @@ where
};
self.events_queue.push(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substream,
substreams: smallvec![substream],
shutdown: SmallVec::new()
}
}
ProtocolState::Normal { substream: existing, mut shutdown } => {
debug!(target: "sub-libp2p", "Received extra substream after having already one \
open in backwards-compatibility mode with {:?}", self.remote_peer_id);
substream.shutdown();
shutdown.push(substream);
ProtocolState::Normal { substream: existing, shutdown }
ProtocolState::Normal { substreams: mut existing, shutdown } => {
existing.push(substream);
ProtocolState::Normal { substreams: existing, shutdown }
}
ProtocolState::Disabled { mut shutdown, .. } => {
@@ -505,8 +516,8 @@ where
/// Sends a message to the remote.
fn send_message(&mut self, message: TMessage) {
match self.state {
ProtocolState::Normal { ref mut substream, .. } =>
substream.send_message(message),
ProtocolState::Normal { ref mut substreams, .. } =>
substreams[0].send_message(message),
_ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \
with {:?}", self.remote_peer_id)