diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index cb9c552115..880b381e66 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -20,6 +20,8 @@ use crate::{ Event, ObservedRole, DhtEvent, ExHashT, }; use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol}; + +use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::kad::record; @@ -135,7 +137,11 @@ impl Behaviour { engine_id: ConsensusEngineId, protocol_name: impl Into>, ) { - let list = self.substrate.register_notifications_protocol(engine_id, protocol_name); + // This is the message that we will send to the remote as part of the initial handshake. + // At the moment, we force this to be an encoded `Roles`. + let handshake_message = Roles::from(&self.role).encode(); + + let list = self.substrate.register_notifications_protocol(engine_id, protocol_name, handshake_message); for (remote, roles) in list { let role = reported_roles_to_observed_role(&self.role, remote, roles); let ev = Event::NotificationStreamOpened { diff --git a/substrate/client/network/src/lib.rs b/substrate/client/network/src/lib.rs index ed96215d20..44bb1516bd 100644 --- a/substrate/client/network/src/lib.rs +++ b/substrate/client/network/src/lib.rs @@ -210,7 +210,14 @@ //! notifications protocol. //! //! At the moment, for backwards-compatibility, notification protocols are tied to the legacy -//! Substrate substream. In the future, though, it will no longer be the case. +//! Substrate substream. Additionally, the handshake message is hardcoded to be a single 8-bits +//! integer representing the role of the node: +//! +//! - 1 for a full node. +//! - 2 for a light node. +//! - 4 for an authority. +//! +//! In the future, though, these restrictions will be removed. //! //! # Usage //! diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index 6ad8259484..42084b7ce3 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -1043,12 +1043,13 @@ impl Protocol { &'a mut self, engine_id: ConsensusEngineId, protocol_name: impl Into>, + handshake_message: Vec, ) -> impl ExactSizeIterator + 'a { let protocol_name = protocol_name.into(); if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() { error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name); } else { - self.behaviour.register_notif_protocol(protocol_name.clone(), Vec::new()); + self.behaviour.register_notif_protocol(protocol_name.clone(), handshake_message); self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id)); } diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index e62edb3733..35a8b03d36 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -113,6 +113,8 @@ pub struct GenericProto { legacy_protocol: RegisteredProtocol, /// Notification protocols. Entries are only ever added and not removed. + /// Contains, for each protocol, the protocol name and the message to send as part of the + /// initial handshake. notif_protocols: Vec<(Cow<'static, [u8]>, Vec)>, /// Receiver for instructions about who to connect to or disconnect from. diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs index 46b759d458..04293c7c9f 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs @@ -75,11 +75,12 @@ use std::{borrow::Cow, error, io, str, task::{Context, Poll}}; /// /// See the documentation at the module level for more information. pub struct NotifsHandlerProto { - /// Prototypes for handlers for inbound substreams. - in_handlers: Vec, + /// Prototypes for handlers for inbound substreams, and the message we respond with in the + /// handshake. + in_handlers: Vec<(NotifsInHandlerProto, Vec)>, - /// Prototypes for handlers for outbound substreams. - out_handlers: Vec, + /// Prototypes for handlers for outbound substreams, and the initial handshake message we send. + out_handlers: Vec<(NotifsOutHandlerProto, Vec)>, /// Prototype for handler for backwards-compatibility. legacy: LegacyProtoHandlerProto, @@ -89,11 +90,11 @@ pub struct NotifsHandlerProto { /// /// See the documentation at the module level for more information. pub struct NotifsHandler { - /// Handlers for inbound substreams. - in_handlers: Vec, + /// Handlers for inbound substreams, and the message we respond with in the handshake. + in_handlers: Vec<(NotifsInHandler, Vec)>, - /// Handlers for outbound substreams. - out_handlers: Vec, + /// Handlers for outbound substreams, and the initial handshake message we send. + out_handlers: Vec<(NotifsOutHandler, Vec)>, /// Handler for backwards-compatibility. legacy: LegacyProtoHandler, @@ -119,7 +120,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto { fn inbound_protocol(&self) -> SelectUpgrade, RegisteredProtocol> { let in_handlers = self.in_handlers.iter() - .map(|h| h.inbound_protocol()) + .map(|(h, _)| h.inbound_protocol()) .collect::>(); SelectUpgrade::new(in_handlers, self.legacy.inbound_protocol()) @@ -129,11 +130,11 @@ impl IntoProtocolsHandler for NotifsHandlerProto { NotifsHandler { in_handlers: self.in_handlers .into_iter() - .map(|p| p.into_handler(remote_peer_id, connected_point)) + .map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg)) .collect(), out_handlers: self.out_handlers .into_iter() - .map(|p| p.into_handler(remote_peer_id, connected_point)) + .map(|(proto, msg)| (proto.into_handler(remote_peer_id, connected_point), msg)) .collect(), legacy: self.legacy.into_handler(remote_peer_id, connected_point), enabled: EnabledState::Initial, @@ -232,28 +233,42 @@ pub enum NotifsHandlerOut { impl NotifsHandlerProto { /// Builds a new handler. /// + /// `list` is a list of notification protocols names, and the message to send as part of the + /// handshake. At the moment, the message is always the same whether we open a substream + /// ourselves or respond to handshake from the remote. + /// /// The `queue_size_report` is an optional Prometheus metric that can report the size of the /// messages queue. If passed, it must have one label for the protocol name. - pub fn new(legacy: RegisteredProtocol, list: impl Into, Vec)>>, queue_size_report: Option) -> Self { + pub fn new( + legacy: RegisteredProtocol, + list: impl Into, Vec)>>, + queue_size_report: Option + ) -> Self { let list = list.into(); let out_handlers = list .clone() .into_iter() - .map(|(p, _)| { + .map(|(proto_name, initial_message)| { let queue_size_report = queue_size_report.as_ref().and_then(|qs| { - if let Ok(utf8) = str::from_utf8(&p) { + if let Ok(utf8) = str::from_utf8(&proto_name) { Some(qs.with_label_values(&[utf8])) } else { - log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", p); + log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", proto_name); None } }); - NotifsOutHandlerProto::new(p, queue_size_report) + + (NotifsOutHandlerProto::new(proto_name, queue_size_report), initial_message) }).collect(); + let in_handlers = list.clone() + .into_iter() + .map(|(proto_name, msg)| (NotifsInHandlerProto::new(proto_name), msg)) + .collect(); + NotifsHandlerProto { - in_handlers: list.clone().into_iter().map(|(p, _)| NotifsInHandlerProto::new(p)).collect(), + in_handlers, out_handlers, legacy: LegacyProtoHandlerProto::new(legacy), } @@ -277,7 +292,7 @@ impl ProtocolsHandler for NotifsHandler { fn listen_protocol(&self) -> SubstreamProtocol { let in_handlers = self.in_handlers.iter() - .map(|h| h.listen_protocol().into_upgrade().1) + .map(|(h, _)| h.listen_protocol().into_upgrade().1) .collect::>(); let proto = SelectUpgrade::new(in_handlers, self.legacy.listen_protocol().into_upgrade().1); @@ -290,7 +305,7 @@ impl ProtocolsHandler for NotifsHandler { ) { match out { EitherOutput::First((out, num)) => - self.in_handlers[num].inject_fully_negotiated_inbound(out), + self.in_handlers[num].0.inject_fully_negotiated_inbound(out), EitherOutput::Second(out) => self.legacy.inject_fully_negotiated_inbound(out), } @@ -303,7 +318,7 @@ impl ProtocolsHandler for NotifsHandler { ) { match (out, num) { (EitherOutput::First(out), Some(num)) => - self.out_handlers[num].inject_fully_negotiated_outbound(out, ()), + self.out_handlers[num].0.inject_fully_negotiated_outbound(out, ()), (EitherOutput::Second(out), None) => self.legacy.inject_fully_negotiated_outbound(out, ()), _ => error!("inject_fully_negotiated_outbound called with wrong parameters"), @@ -318,13 +333,15 @@ impl ProtocolsHandler for NotifsHandler { } self.enabled = EnabledState::Enabled; self.legacy.inject_event(LegacyProtoHandlerIn::Enable); - for handler in &mut self.out_handlers { + for (handler, initial_message) in &mut self.out_handlers { handler.inject_event(NotifsOutHandlerIn::Enable { - initial_message: vec![] + initial_message: initial_message.clone(), }); } for num in self.pending_in.drain(..) { - self.in_handlers[num].inject_event(NotifsInHandlerIn::Accept(vec![])); + let handshake_message = self.in_handlers[num].1.clone(); + self.in_handlers[num].0 + .inject_event(NotifsInHandlerIn::Accept(handshake_message)); } }, NotifsHandlerIn::Disable => { @@ -335,19 +352,19 @@ impl ProtocolsHandler for NotifsHandler { // The notifications protocols start in the disabled state. If we were in the // "Initial" state, then we shouldn't disable the notifications protocols again. if self.enabled != EnabledState::Initial { - for handler in &mut self.out_handlers { + for (handler, _) in &mut self.out_handlers { handler.inject_event(NotifsOutHandlerIn::Disable); } } self.enabled = EnabledState::Disabled; for num in self.pending_in.drain(..) { - self.in_handlers[num].inject_event(NotifsInHandlerIn::Refuse); + self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Refuse); } }, NotifsHandlerIn::SendLegacy { message } => self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }), NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => { - for handler in &mut self.out_handlers { + for (handler, _) in &mut self.out_handlers { if handler.protocol_name() != &protocol_name[..] { continue; } @@ -372,21 +389,21 @@ impl ProtocolsHandler for NotifsHandler { ) { match (err, num) { (ProtocolsHandlerUpgrErr::Timeout, Some(num)) => - self.out_handlers[num].inject_dial_upgrade_error( + self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Timeout ), (ProtocolsHandlerUpgrErr::Timeout, None) => self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout), (ProtocolsHandlerUpgrErr::Timer, Some(num)) => - self.out_handlers[num].inject_dial_upgrade_error( + self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Timer ), (ProtocolsHandlerUpgrErr::Timer, None) => self.legacy.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timer), (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)), Some(num)) => - self.out_handlers[num].inject_dial_upgrade_error( + self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) ), @@ -396,7 +413,7 @@ impl ProtocolsHandler for NotifsHandler { ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) ), (ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))), Some(num)) => - self.out_handlers[num].inject_dial_upgrade_error( + self.out_handlers[num].0.inject_dial_upgrade_error( (), ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) ), @@ -417,7 +434,7 @@ impl ProtocolsHandler for NotifsHandler { return KeepAlive::Yes; } - for handler in &self.in_handlers { + for (handler, _) in &self.in_handlers { let val = handler.connection_keep_alive(); if val.is_yes() { return KeepAlive::Yes; @@ -425,7 +442,7 @@ impl ProtocolsHandler for NotifsHandler { if ret < val { ret = val; } } - for handler in &self.out_handlers { + for (handler, _) in &self.out_handlers { let val = handler.connection_keep_alive(); if val.is_yes() { return KeepAlive::Yes; @@ -474,7 +491,7 @@ impl ProtocolsHandler for NotifsHandler { } } - for (handler_num, handler) in self.in_handlers.iter_mut().enumerate() { + for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { while let Poll::Ready(ev) = handler.poll(cx) { match ev { ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => @@ -484,7 +501,7 @@ impl ProtocolsHandler for NotifsHandler { match self.enabled { EnabledState::Initial => self.pending_in.push(handler_num), EnabledState::Enabled => - handler.inject_event(NotifsInHandlerIn::Accept(vec![])), + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message.clone())), EnabledState::Disabled => handler.inject_event(NotifsInHandlerIn::Refuse), }, @@ -504,7 +521,7 @@ impl ProtocolsHandler for NotifsHandler { } } - for (handler_num, handler) in self.out_handlers.iter_mut().enumerate() { + for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() { while let Poll::Ready(ev) = handler.poll(cx) { match ev { ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } =>