Move the legacy protocol handshake to the legacy substream (#5938)

* Move the legacy protocol handshake to the legacy substream

* Fix tests

* Remove line that wasn't supposed to be committed

* Remove hack

* Rework how it's done

* Some little changes

* update_chain wasn't doing its thing

* Fix service tests not calling update_chain

* Update client/network/src/protocol/generic_proto/behaviour.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* [WIP]

* Revert "[WIP]"

This reverts commit 2b892e6a7637c0b1297e6ecdbb919321c9098ff5.

* Update client/network/src/protocol.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Fix received message not being handshake

* Update client/network/src/protocol/generic_proto/behaviour.rs

Co-authored-by: Max Inden <mail@max-inden.de>

Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Pierre Krieger
2020-07-15 11:29:10 +02:00
committed by GitHub
parent 8f4329823a
commit 4720f0fdda
5 changed files with 104 additions and 131 deletions
@@ -281,6 +281,9 @@ pub enum GenericProtoOut {
CustomProtocolOpen {
/// Id of the peer we are connected to.
peer_id: PeerId,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this is out of the concern of this code.
received_handshake: Vec<u8>,
},
/// Closed a custom protocol with the remote.
@@ -1235,7 +1238,7 @@ impl NetworkBehaviour for GenericProto {
}
}
NotifsHandlerOut::Open { endpoint } => {
NotifsHandlerOut::Open { endpoint, received_handshake } => {
debug!(target: "sub-libp2p",
"Handler({:?}) => Endpoint {:?} open for custom protocols.",
source, endpoint);
@@ -1266,10 +1269,34 @@ impl NetworkBehaviour for GenericProto {
if first {
debug!(target: "sub-libp2p", "External API <= Open({:?})", source);
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source };
let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake };
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
} else {
debug!(target: "sub-libp2p", "Secondary connection opened custom protocol.");
// In normal situations, the handshake is supposed to be a Status message, and
// we would discard Status messages received from secondary connections.
// However, in Polkadot 0.8.10 and below, nodes don't send a Status message
// when opening secondary connections and instead directly consider the
// substream as open. When connecting to such a node, the first message sent
// by the remote will always be considered by our local node as the handshake,
// even when it is a regular message.
// In order to maintain backwards compatibility, we therefore report the
// handshake as if it was a regular message, and the upper layer will ignore
// any superfluous Status message.
// The code below should be removed once Polkadot 0.8.10 and below are no
// longer widely in use, and should be replaced with simply printing a log
// entry.
debug!(
target: "sub-libp2p",
"Handler({:?}) => Secondary connection opened custom protocol",
source
);
trace!(target: "sub-libp2p", "External API <= Message({:?})", source);
let event = GenericProtoOut::LegacyMessage {
peer_id: source,
message: From::from(&received_handshake[..]),
};
self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
}
}
@@ -187,6 +187,9 @@ pub enum NotifsHandlerOut {
Open {
/// The endpoint of the connection that is open for custom protocols.
endpoint: ConnectedPoint,
/// Handshake that was sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
},
/// The connection is closed for custom protocols.
@@ -465,9 +468,9 @@ impl ProtocolsHandler for NotifsHandler {
protocol: protocol.map_upgrade(EitherUpgrade::B),
info: None,
}),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) =>
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
NotifsHandlerOut::Open { endpoint }
NotifsHandlerOut::Open { endpoint, received_handshake }
)),
ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) =>
Poll::Ready(ProtocolsHandlerEvent::Custom(
@@ -150,7 +150,8 @@ enum ProtocolState {
/// Waiting for the behaviour to tell the handler whether it is enabled or disabled.
Init {
/// List of substreams opened by the remote but that haven't been processed yet.
substreams: SmallVec<[RegisteredProtocolSubstream<NegotiatedSubstream>; 6]>,
/// For each substream, also includes the handshake message that we have received.
substreams: SmallVec<[(RegisteredProtocolSubstream<NegotiatedSubstream>, Vec<u8>); 6]>,
/// Deadline after which the initialization is abnormally long.
init_deadline: Delay,
},
@@ -218,6 +219,9 @@ pub enum LegacyProtoHandlerOut {
CustomProtocolOpen {
/// Version of the protocol that has been opened.
version: u8,
/// Handshake message that has been sent to us.
/// This is normally a "Status" message, but this out of the concern of this code.
received_handshake: Vec<u8>,
/// The connected endpoint.
endpoint: ConnectedPoint,
},
@@ -267,7 +271,7 @@ impl LegacyProtoHandler {
ProtocolState::Poisoned
}
ProtocolState::Init { substreams: incoming, .. } => {
ProtocolState::Init { substreams: mut incoming, .. } => {
if incoming.is_empty() {
if let ConnectedPoint::Dialer { .. } = self.endpoint {
self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
@@ -280,12 +284,13 @@ impl LegacyProtoHandler {
}
} else {
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
version: incoming[0].protocol_version(),
endpoint: self.endpoint.clone()
version: incoming[0].0.protocol_version(),
endpoint: self.endpoint.clone(),
received_handshake: mem::replace(&mut incoming[0].1, Vec::new()),
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
substreams: incoming.into_iter().collect(),
substreams: incoming.into_iter().map(|(s, _)| s).collect(),
shutdown: SmallVec::new()
}
}
@@ -309,7 +314,8 @@ impl LegacyProtoHandler {
ProtocolState::Poisoned
}
ProtocolState::Init { substreams: mut shutdown, .. } => {
ProtocolState::Init { substreams: shutdown, .. } => {
let mut shutdown = shutdown.into_iter().map(|(s, _)| s).collect::<SmallVec<[_; 6]>>();
for s in &mut shutdown {
s.shutdown();
}
@@ -464,7 +470,8 @@ impl LegacyProtoHandler {
/// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`.
fn inject_fully_negotiated(
&mut self,
mut substream: RegisteredProtocolSubstream<NegotiatedSubstream>
mut substream: RegisteredProtocolSubstream<NegotiatedSubstream>,
received_handshake: Vec<u8>,
) {
self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) {
ProtocolState::Poisoned => {
@@ -478,14 +485,15 @@ impl LegacyProtoHandler {
error!(target: "sub-libp2p", "Opened dialing substream with {:?} before \
initialization", self.remote_peer_id);
}
substreams.push(substream);
substreams.push((substream, received_handshake));
ProtocolState::Init { substreams, init_deadline }
}
ProtocolState::Opening { .. } => {
let event = LegacyProtoHandlerOut::CustomProtocolOpen {
version: substream.protocol_version(),
endpoint: self.endpoint.clone()
endpoint: self.endpoint.clone(),
received_handshake,
};
self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event));
ProtocolState::Normal {
@@ -535,17 +543,17 @@ impl ProtocolsHandler for LegacyProtoHandler {
fn inject_fully_negotiated_inbound(
&mut self,
proto: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
(substream, handshake): <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
) {
self.inject_fully_negotiated(proto);
self.inject_fully_negotiated(substream, handshake);
}
fn inject_fully_negotiated_outbound(
&mut self,
proto: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
(substream, handshake): <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) {
self.inject_fully_negotiated(proto);
self.inject_fully_negotiated(substream, handshake);
}
fn inject_event(&mut self, message: LegacyProtoHandlerIn) {
@@ -248,7 +248,7 @@ impl ProtocolName for RegisteredProtocolName {
impl<TSubstream> InboundUpgrade<TSubstream> for RegisteredProtocol
where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = RegisteredProtocolSubstream<TSubstream>;
type Output = (RegisteredProtocolSubstream<TSubstream>, Vec<u8>);
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, io::Error>> + Send>>;
type Error = io::Error;
@@ -266,8 +266,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
let handshake = BytesMut::from(&self.handshake_message.read()[..]);
framed.send(handshake).await?;
let received_handshake = framed.next().await
.ok_or_else(|| io::ErrorKind::UnexpectedEof)??;
Ok(RegisteredProtocolSubstream {
Ok((RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Listener,
send_queue: VecDeque::new(),
@@ -275,7 +277,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
})
}, received_handshake.to_vec()))
})
}
}
@@ -301,8 +303,12 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
let handshake = BytesMut::from(&self.handshake_message.read()[..]);
framed.send(handshake).await?;
let received_handshake = framed.next().await
.ok_or_else(|| {
io::Error::new(io::ErrorKind::UnexpectedEof, "Failed to receive handshake")
})??;
Ok(RegisteredProtocolSubstream {
Ok((RegisteredProtocolSubstream {
is_closing: false,
endpoint: Endpoint::Dialer,
send_queue: VecDeque::new(),
@@ -310,7 +316,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
inner: framed.fuse(),
protocol_version: info.version,
clogged_fuse: false,
})
}, received_handshake.to_vec()))
})
}
}