diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 601c7d9c99..626cb04389 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -135,7 +135,6 @@ mod rep { } struct Metrics { - handshaking_peers: Gauge, obsolete_requests: Gauge, peers: Gauge, queued_blocks: Gauge, @@ -148,10 +147,6 @@ struct Metrics { impl Metrics { fn register(r: &Registry) -> Result { Ok(Metrics { - handshaking_peers: { - let g = Gauge::new("sync_handshaking_peers", "Number of newly connected peers")?; - register(g, r)? - }, obsolete_requests: { let g = Gauge::new("sync_obsolete_requests", "Number of obsolete requests")?; register(g, r)? @@ -239,8 +234,6 @@ pub struct Protocol { /// List of nodes for which we perform additional logging because they are important for the /// user. important_peers: HashSet, - // Connected peers pending Status message. - handshaking_peers: HashMap, /// Used to report reputation changes. peerset_handle: sc_peerset::PeersetHandle, transaction_pool: Arc>, @@ -269,13 +262,6 @@ struct PacketStats { count_in: u64, count_out: u64, } - -/// A peer that we are connected to -/// and from whom we have not yet received a Status message. -struct HandshakingPeer { - timestamp: Instant, -} - /// Peer information #[derive(Debug, Clone)] struct Peer { @@ -426,7 +412,7 @@ impl Protocol { versions, build_status_message(&config, &chain), peerset, - queue_size_report + queue_size_report, ); let mut legacy_equiv_by_name = HashMap::new(); @@ -466,7 +452,6 @@ impl Protocol { }, genesis_hash: info.genesis_hash, sync, - handshaking_peers: HashMap::new(), important_peers, transaction_pool, finality_proof_provider, @@ -616,7 +601,8 @@ impl Protocol { stats.count_in += 1; match message { - GenericMessage::Status(s) => return self.on_status_message(who, s), + GenericMessage::Status(_) => + debug!(target: "sub-libp2p", "Received unexpected Status"), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { let outcome = self.on_block_response(who.clone(), r); @@ -707,12 +693,6 @@ impl Protocol { update_peer_request::(&mut self.context_data.peers, who, request) } - /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, who: PeerId) { - trace!(target: "sync", "Connecting {}", who); - self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() }); - } - /// Called by peer when it is disconnecting pub fn on_peer_disconnected(&mut self, peer: PeerId) -> CustomMessageOutcome { if self.important_peers.contains(&peer) { @@ -721,12 +701,7 @@ impl Protocol { trace!(target: "sync", "{} disconnected", peer); } - // lock all the the peer lists so that add/remove peer events are in order - let removed = { - self.handshaking_peers.remove(&peer); - self.context_data.peers.remove(&peer) - }; - if let Some(_peer_data) = removed { + if let Some(_peer_data) = self.context_data.peers.remove(&peer) { self.sync.peer_disconnected(&peer); // Notify all the notification protocols as closed. @@ -955,16 +930,6 @@ impl Protocol { aborting.push(who.clone()); } } - for (who, _) in self.handshaking_peers.iter() - .filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC) - { - log!( - target: "sync", - if self.important_peers.contains(who) { Level::Warn } else { Level::Trace }, - "Handshake timeout {}", who - ); - aborting.push(who.clone()); - } } for p in aborting { @@ -973,8 +938,8 @@ impl Protocol { } } - /// Called by peer to report status - fn on_status_message(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { + /// Called on receipt of a status message via the legacy protocol on the first connection between two peers. + pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); let _protocol_version = { if self.context_data.peers.contains_key(&who) { @@ -1041,23 +1006,13 @@ impl Protocol { } } - let info = match self.handshaking_peers.remove(&who) { - Some(_handshaking) => { - PeerInfo { - protocol_version: status.version, - roles: status.roles, - best_hash: status.best_hash, - best_number: status.best_number - } - }, - None => { - error!(target: "sync", "Received status from previously unconnected node {}", who); - return CustomMessageOutcome::None; - }, - }; - let peer = Peer { - info, + info: PeerInfo { + protocol_version: status.version, + roles: status.roles, + best_hash: status.best_hash, + best_number: status.best_number + }, block_request: None, known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) .expect("Constant is nonzero")), @@ -1837,9 +1792,6 @@ impl Protocol { } metrics.obsolete_requests.set(obsolete_requests); - let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX); - metrics.handshaking_peers.set(n); - let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX); metrics.peers.set(n); @@ -2042,9 +1994,31 @@ impl NetworkBehaviour for Protocol { }; let outcome = match event { - GenericProtoOut::CustomProtocolOpen { peer_id, .. } => { - self.on_peer_connected(peer_id); - CustomMessageOutcome::None + GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => { + match as Decode>::decode(&mut &received_handshake[..]) { + Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake), + Ok(msg) => { + debug!( + target: "sync", + "Expected Status message from {}, but got {:?}", + peer_id, + msg, + ); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + CustomMessageOutcome::None + } + Err(err) => { + debug!( + target: "sync", + "Couldn't decode handshake sent by {}: {:?}: {}", + peer_id, + received_handshake, + err.what() + ); + self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); + CustomMessageOutcome::None + } + } } GenericProtoOut::CustomProtocolClosed { peer_id, .. } => { self.on_peer_disconnected(peer_id) @@ -2130,48 +2104,3 @@ impl Drop for Protocol { debug!(target: "sync", "Network stats:\n{}", self.format_stats()); } } - -#[cfg(test)] -mod tests { - use crate::PeerId; - use crate::config::EmptyTransactionPool; - use super::{CustomMessageOutcome, Protocol, ProtocolConfig}; - - use sp_consensus::block_validation::DefaultBlockAnnounceValidator; - use std::sync::Arc; - use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt}; - use substrate_test_runtime_client::runtime::{Block, Hash}; - - #[test] - fn no_handshake_no_notif_closed() { - let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0); - - let (mut protocol, _) = Protocol::::new( - ProtocolConfig::default(), - PeerId::random(), - client.clone(), - Arc::new(EmptyTransactionPool), - None, - None, - From::from(&b"test"[..]), - sc_peerset::PeersetConfig { - in_peers: 10, - out_peers: 10, - bootnodes: Vec::new(), - reserved_only: false, - priority_groups: Vec::new(), - }, - Box::new(DefaultBlockAnnounceValidator), - None, - Default::default(), - None, - ).unwrap(); - - let dummy_peer_id = PeerId::random(); - let _ = protocol.on_peer_connected(dummy_peer_id.clone()); - match protocol.on_peer_disconnected(dummy_peer_id) { - CustomMessageOutcome::None => {}, - _ => panic!() - }; - } -} diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index 75f011d9f8..0e56b03b7a 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -281,6 +281,9 @@ pub enum GenericProtoOut { CustomProtocolOpen { /// Id of the peer we are connected to. peer_id: PeerId, + /// Handshake that was sent to us. + /// This is normally a "Status" message, but this is out of the concern of this code. + received_handshake: Vec, }, /// Closed a custom protocol with the remote. @@ -1235,7 +1238,7 @@ impl NetworkBehaviour for GenericProto { } } - NotifsHandlerOut::Open { endpoint } => { + NotifsHandlerOut::Open { endpoint, received_handshake } => { debug!(target: "sub-libp2p", "Handler({:?}) => Endpoint {:?} open for custom protocols.", source, endpoint); @@ -1266,10 +1269,34 @@ impl NetworkBehaviour for GenericProto { if first { debug!(target: "sub-libp2p", "External API <= Open({:?})", source); - let event = GenericProtoOut::CustomProtocolOpen { peer_id: source }; + let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake }; self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + } else { - debug!(target: "sub-libp2p", "Secondary connection opened custom protocol."); + // In normal situations, the handshake is supposed to be a Status message, and + // we would discard Status messages received from secondary connections. + // However, in Polkadot 0.8.10 and below, nodes don't send a Status message + // when opening secondary connections and instead directly consider the + // substream as open. When connecting to such a node, the first message sent + // by the remote will always be considered by our local node as the handshake, + // even when it is a regular message. + // In order to maintain backwards compatibility, we therefore report the + // handshake as if it was a regular message, and the upper layer will ignore + // any superfluous Status message. + // The code below should be removed once Polkadot 0.8.10 and below are no + // longer widely in use, and should be replaced with simply printing a log + // entry. + debug!( + target: "sub-libp2p", + "Handler({:?}) => Secondary connection opened custom protocol", + source + ); + trace!(target: "sub-libp2p", "External API <= Message({:?})", source); + let event = GenericProtoOut::LegacyMessage { + peer_id: source, + message: From::from(&received_handshake[..]), + }; + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs index 1364ecf39d..3403f7dd82 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs @@ -187,6 +187,9 @@ pub enum NotifsHandlerOut { Open { /// The endpoint of the connection that is open for custom protocols. endpoint: ConnectedPoint, + /// Handshake that was sent to us. + /// This is normally a "Status" message, but this out of the concern of this code. + received_handshake: Vec, }, /// The connection is closed for custom protocols. @@ -465,9 +468,9 @@ impl ProtocolsHandler for NotifsHandler { protocol: protocol.map_upgrade(EitherUpgrade::B), info: None, }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, .. }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) => Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open { endpoint } + NotifsHandlerOut::Open { endpoint, received_handshake } )), ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => Poll::Ready(ProtocolsHandlerEvent::Custom( diff --git a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs index 1469380fb7..71d6175f06 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -150,7 +150,8 @@ enum ProtocolState { /// Waiting for the behaviour to tell the handler whether it is enabled or disabled. Init { /// List of substreams opened by the remote but that haven't been processed yet. - substreams: SmallVec<[RegisteredProtocolSubstream; 6]>, + /// For each substream, also includes the handshake message that we have received. + substreams: SmallVec<[(RegisteredProtocolSubstream, Vec); 6]>, /// Deadline after which the initialization is abnormally long. init_deadline: Delay, }, @@ -218,6 +219,9 @@ pub enum LegacyProtoHandlerOut { CustomProtocolOpen { /// Version of the protocol that has been opened. version: u8, + /// Handshake message that has been sent to us. + /// This is normally a "Status" message, but this out of the concern of this code. + received_handshake: Vec, /// The connected endpoint. endpoint: ConnectedPoint, }, @@ -267,7 +271,7 @@ impl LegacyProtoHandler { ProtocolState::Poisoned } - ProtocolState::Init { substreams: incoming, .. } => { + ProtocolState::Init { substreams: mut incoming, .. } => { if incoming.is_empty() { if let ConnectedPoint::Dialer { .. } = self.endpoint { self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest { @@ -280,12 +284,13 @@ impl LegacyProtoHandler { } } else { let event = LegacyProtoHandlerOut::CustomProtocolOpen { - version: incoming[0].protocol_version(), - endpoint: self.endpoint.clone() + version: incoming[0].0.protocol_version(), + endpoint: self.endpoint.clone(), + received_handshake: mem::replace(&mut incoming[0].1, Vec::new()), }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { - substreams: incoming.into_iter().collect(), + substreams: incoming.into_iter().map(|(s, _)| s).collect(), shutdown: SmallVec::new() } } @@ -309,7 +314,8 @@ impl LegacyProtoHandler { ProtocolState::Poisoned } - ProtocolState::Init { substreams: mut shutdown, .. } => { + ProtocolState::Init { substreams: shutdown, .. } => { + let mut shutdown = shutdown.into_iter().map(|(s, _)| s).collect::>(); for s in &mut shutdown { s.shutdown(); } @@ -464,7 +470,8 @@ impl LegacyProtoHandler { /// Called by `inject_fully_negotiated_inbound` and `inject_fully_negotiated_outbound`. fn inject_fully_negotiated( &mut self, - mut substream: RegisteredProtocolSubstream + mut substream: RegisteredProtocolSubstream, + received_handshake: Vec, ) { self.state = match mem::replace(&mut self.state, ProtocolState::Poisoned) { ProtocolState::Poisoned => { @@ -478,14 +485,15 @@ impl LegacyProtoHandler { error!(target: "sub-libp2p", "Opened dialing substream with {:?} before \ initialization", self.remote_peer_id); } - substreams.push(substream); + substreams.push((substream, received_handshake)); ProtocolState::Init { substreams, init_deadline } } ProtocolState::Opening { .. } => { let event = LegacyProtoHandlerOut::CustomProtocolOpen { version: substream.protocol_version(), - endpoint: self.endpoint.clone() + endpoint: self.endpoint.clone(), + received_handshake, }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); ProtocolState::Normal { @@ -535,17 +543,17 @@ impl ProtocolsHandler for LegacyProtoHandler { fn inject_fully_negotiated_inbound( &mut self, - proto: >::Output + (substream, handshake): >::Output ) { - self.inject_fully_negotiated(proto); + self.inject_fully_negotiated(substream, handshake); } fn inject_fully_negotiated_outbound( &mut self, - proto: >::Output, + (substream, handshake): >::Output, _: Self::OutboundOpenInfo ) { - self.inject_fully_negotiated(proto); + self.inject_fully_negotiated(substream, handshake); } fn inject_event(&mut self, message: LegacyProtoHandlerIn) { diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs index 6a8619ee4e..ce2d1934c0 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -248,7 +248,7 @@ impl ProtocolName for RegisteredProtocolName { impl InboundUpgrade for RegisteredProtocol where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - type Output = RegisteredProtocolSubstream; + type Output = (RegisteredProtocolSubstream, Vec); type Future = Pin> + Send>>; type Error = io::Error; @@ -266,8 +266,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, let handshake = BytesMut::from(&self.handshake_message.read()[..]); framed.send(handshake).await?; + let received_handshake = framed.next().await + .ok_or_else(|| io::ErrorKind::UnexpectedEof)??; - Ok(RegisteredProtocolSubstream { + Ok((RegisteredProtocolSubstream { is_closing: false, endpoint: Endpoint::Listener, send_queue: VecDeque::new(), @@ -275,7 +277,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, inner: framed.fuse(), protocol_version: info.version, clogged_fuse: false, - }) + }, received_handshake.to_vec())) }) } } @@ -301,8 +303,12 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, let handshake = BytesMut::from(&self.handshake_message.read()[..]); framed.send(handshake).await?; + let received_handshake = framed.next().await + .ok_or_else(|| { + io::Error::new(io::ErrorKind::UnexpectedEof, "Failed to receive handshake") + })??; - Ok(RegisteredProtocolSubstream { + Ok((RegisteredProtocolSubstream { is_closing: false, endpoint: Endpoint::Dialer, send_queue: VecDeque::new(), @@ -310,7 +316,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, inner: framed.fuse(), protocol_version: info.version, clogged_fuse: false, - }) + }, received_handshake.to_vec())) }) } }