diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 5967613b98..9a466388f4 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -17,10 +17,11 @@ use crate::{ config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests, peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, - protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol}, - Event, ObservedRole, DhtEvent, ExHashT, + protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol}, + ObservedRole, DhtEvent, ExHashT, }; +use bytes::Bytes; use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; @@ -98,11 +99,53 @@ pub enum BehaviourOut { request_duration: Duration, }, - /// Any event represented by the [`Event`] enum. + /// Opened a substream with the given node with the given notifications protocol. /// - /// > **Note**: The [`Event`] enum contains the events that are available through the public - /// > API of the library. - Event(Event), + /// The protocol is always one of the notification protocols that have been registered. + NotificationStreamOpened { + /// Node we opened the substream with. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + /// Object that permits sending notifications to the peer. + notifications_sink: NotificationsSink, + /// Role of the remote. + role: ObservedRole, + }, + + /// The [`NotificationsSink`] object used to send notifications with the given peer must be + /// replaced with a new one. + /// + /// This event is typically emitted when a transport-level connection is closed and we fall + /// back to a secondary connection. + NotificationStreamReplaced { + /// Id of the peer we are connected to. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + /// Replacement for the previous [`NotificationsSink`]. + notifications_sink: NotificationsSink, + }, + + /// Closed a substream with the given node. Always matches a corresponding previous + /// `NotificationStreamOpened` message. + NotificationStreamClosed { + /// Node we closed the substream with. + remote: PeerId, + /// The concerned protocol. Each protocol uses a different substream. + engine_id: ConsensusEngineId, + }, + + /// Received one or more messages from the given node using the given protocol. + NotificationsReceived { + /// Node we received the message from. + remote: PeerId, + /// Concerned protocol and associated message. + messages: Vec<(ConsensusEngineId, Bytes)>, + }, + + /// Event generated by a DHT. + Dht(DhtEvent), } impl Behaviour { @@ -165,8 +208,6 @@ impl Behaviour { /// Registers a new notifications protocol. /// - /// After that, you can call `write_notifications`. - /// /// Please call `event_stream` before registering a protocol, otherwise you may miss events /// about the protocol that you have registered. /// @@ -182,14 +223,14 @@ impl Behaviour { let handshake_message = Roles::from(&self.role).encode(); let list = self.substrate.register_notifications_protocol(engine_id, protocol_name, handshake_message); - for (remote, roles) in list { + for (remote, roles, notifications_sink) in list { let role = reported_roles_to_observed_role(&self.role, remote, roles); - let ev = Event::NotificationStreamOpened { + self.events.push_back(BehaviourOut::NotificationStreamOpened { remote: remote.clone(), engine_id, role, - }; - self.events.push_back(BehaviourOut::Event(ev)); + notifications_sink: notifications_sink.clone(), + }); } } @@ -278,26 +319,34 @@ Behaviour { CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => { self.finality_proof_requests.send_request(&target, block_hash, request); }, - CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => { + CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles, notifications_sink } => { let role = reported_roles_to_observed_role(&self.role, &remote, roles); for engine_id in protocols { - self.events.push_back(BehaviourOut::Event(Event::NotificationStreamOpened { + self.events.push_back(BehaviourOut::NotificationStreamOpened { remote: remote.clone(), engine_id, role: role.clone(), - })); + notifications_sink: notifications_sink.clone(), + }); } }, - CustomMessageOutcome::NotificationStreamClosed { remote, protocols } => + CustomMessageOutcome::NotificationStreamReplaced { remote, protocols, notifications_sink } => for engine_id in protocols { - self.events.push_back(BehaviourOut::Event(Event::NotificationStreamClosed { + self.events.push_back(BehaviourOut::NotificationStreamReplaced { remote: remote.clone(), engine_id, - })); + notifications_sink: notifications_sink.clone(), + }); + }, + CustomMessageOutcome::NotificationStreamClosed { remote, protocols } => + for engine_id in protocols { + self.events.push_back(BehaviourOut::NotificationStreamClosed { + remote: remote.clone(), + engine_id, + }); }, CustomMessageOutcome::NotificationsReceived { remote, messages } => { - let ev = Event::NotificationsReceived { remote, messages }; - self.events.push_back(BehaviourOut::Event(ev)); + self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages }); }, CustomMessageOutcome::PeerNewBest(peer_id, number) => { self.light_client_handler.update_best_block(&peer_id, number); @@ -393,16 +442,16 @@ impl NetworkBehaviourEventProcess self.substrate.add_discovered_nodes(iter::once(peer_id)); } DiscoveryOut::ValueFound(results) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results))); } DiscoveryOut::ValueNotFound(key) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key))); } DiscoveryOut::ValuePut(key) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key))); } DiscoveryOut::ValuePutFailed(key) => { - self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key)))); + self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key))); } DiscoveryOut::RandomKademliaStarted(protocols) => { for protocol in protocols { diff --git a/substrate/client/network/src/protocol.rs b/substrate/client/network/src/protocol.rs index d606a1be98..d3a729cc8d 100644 --- a/substrate/client/network/src/protocol.rs +++ b/substrate/client/network/src/protocol.rs @@ -47,8 +47,8 @@ use sp_runtime::traits::{ }; use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, Message}; -use message::generic::{Message as GenericMessage, ConsensusMessage, Roles}; -use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64}; +use message::generic::{Message as GenericMessage, Roles}; +use prometheus_endpoint::{Registry, Gauge, Counter, GaugeVec, PrometheusError, Opts, register, U64}; use sync::{ChainSync, SyncState}; use std::borrow::Cow; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque, hash_map::Entry}; @@ -67,7 +67,7 @@ pub mod message; pub mod event; pub mod sync; -pub use generic_proto::LegacyConnectionKillError; +pub use generic_proto::{NotificationsSink, Ready, NotifsHandlerError, LegacyConnectionKillError}; const REQUEST_TIMEOUT_SEC: u64 = 40; /// Interval at which we perform time based maintenance @@ -388,7 +388,6 @@ impl Protocol { block_announce_validator: Box + Send>, metrics_registry: Option<&Registry>, boot_node_ids: Arc>, - queue_size_report: Option, ) -> error::Result<(Protocol, sc_peerset::PeersetHandle)> { let info = chain.info(); let sync = ChainSync::new( @@ -417,7 +416,6 @@ impl Protocol { versions, build_status_message(&config, &chain), peerset, - queue_size_report, ); let mut legacy_equiv_by_name = HashMap::new(); @@ -948,7 +946,12 @@ impl Protocol { } /// Called on receipt of a status message via the legacy protocol on the first connection between two peers. - pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status) -> CustomMessageOutcome { + pub fn on_peer_connected( + &mut self, + who: PeerId, + status: message::Status, + notifications_sink: NotificationsSink, + ) -> CustomMessageOutcome { trace!(target: "sync", "New peer {} {:?}", who, status); let _protocol_version = { if self.context_data.peers.contains_key(&who) { @@ -1060,32 +1063,7 @@ impl Protocol { remote: who, protocols: self.protocol_name_by_engine.keys().cloned().collect(), roles: info.roles, - } - } - - /// Send a notification to the given peer we're connected to. - /// - /// Doesn't do anything if we don't have a notifications substream for that protocol with that - /// peer. - pub fn write_notification( - &mut self, - target: PeerId, - engine_id: ConsensusEngineId, - message: impl Into>, - ) { - if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) { - let message = message.into(); - let fallback = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage { - engine_id, - data: message.clone(), - }).encode(); - self.behaviour.write_notification(&target, protocol_name.clone(), message, fallback); - } else { - error!( - target: "sub-libp2p", - "Sending a notification with a protocol that wasn't registered: {:?}", - engine_id - ); + notifications_sink, } } @@ -1099,7 +1077,7 @@ impl Protocol { engine_id: ConsensusEngineId, protocol_name: impl Into>, handshake_message: Vec, - ) -> impl ExactSizeIterator + 'a { + ) -> impl Iterator + 'a { let protocol_name = protocol_name.into(); if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() { error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name); @@ -1108,8 +1086,15 @@ impl Protocol { self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id)); } - self.context_data.peers.iter() - .map(|(peer_id, peer)| (peer_id, peer.info.roles)) + let behaviour = &self.behaviour; + self.context_data.peers.iter().filter_map(move |(peer_id, peer)| { + if let Some(notifications_sink) = behaviour.notifications_sink(peer_id) { + Some((peer_id, peer.info.roles, notifications_sink)) + } else { + log::error!("State mismatch: no notifications sink for opened peer {:?}", peer_id); + None + } + }) } /// Called when peer sends us new transactions @@ -1863,7 +1848,18 @@ pub enum CustomMessageOutcome { JustificationImport(Origin, B::Hash, NumberFor, Justification), FinalityProofImport(Origin, B::Hash, NumberFor, Vec), /// Notification protocols have been opened with a remote. - NotificationStreamOpened { remote: PeerId, protocols: Vec, roles: Roles }, + NotificationStreamOpened { + remote: PeerId, + protocols: Vec, + roles: Roles, + notifications_sink: NotificationsSink + }, + /// The [`NotificationsSink`] of some notification protocols need an update. + NotificationStreamReplaced { + remote: PeerId, + protocols: Vec, + notifications_sink: NotificationsSink, + }, /// Notification protocols have been closed with a remote. NotificationStreamClosed { remote: PeerId, protocols: Vec }, /// Messages have been received on one or more notifications protocols. @@ -2028,9 +2024,10 @@ impl NetworkBehaviour for Protocol { }; let outcome = match event { - GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => { + GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => { match as Decode>::decode(&mut &received_handshake[..]) { - Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake), + Ok(GenericMessage::Status(handshake)) => + self.on_peer_connected(peer_id, handshake, notifications_sink), Ok(msg) => { debug!( target: "sync", @@ -2054,6 +2051,13 @@ impl NetworkBehaviour for Protocol { } } } + GenericProtoOut::CustomProtocolReplaced { peer_id, notifications_sink, .. } => { + CustomMessageOutcome::NotificationStreamReplaced { + remote: peer_id, + protocols: self.protocol_name_by_engine.keys().cloned().collect(), + notifications_sink, + } + }, GenericProtoOut::CustomProtocolClosed { peer_id, .. } => { self.on_peer_disconnected(peer_id) }, diff --git a/substrate/client/network/src/protocol/generic_proto.rs b/substrate/client/network/src/protocol/generic_proto.rs index cf8434d8bc..3133471b0d 100644 --- a/substrate/client/network/src/protocol/generic_proto.rs +++ b/substrate/client/network/src/protocol/generic_proto.rs @@ -21,7 +21,7 @@ //! network, then performs the Substrate protocol handling on top. pub use self::behaviour::{GenericProto, GenericProtoOut}; -pub use self::handler::LegacyConnectionKillError; +pub use self::handler::{NotifsHandlerError, NotificationsSink, Ready, LegacyConnectionKillError}; mod behaviour; mod handler; diff --git a/substrate/client/network/src/protocol/generic_proto/behaviour.rs b/substrate/client/network/src/protocol/generic_proto/behaviour.rs index 215eb73933..f965980640 100644 --- a/substrate/client/network/src/protocol/generic_proto/behaviour.rs +++ b/substrate/client/network/src/protocol/generic_proto/behaviour.rs @@ -15,8 +15,10 @@ // along with Substrate. If not, see . use crate::config::ProtocolId; -use crate::protocol::generic_proto::handler::{NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}; -use crate::protocol::generic_proto::upgrade::RegisteredProtocol; +use crate::protocol::generic_proto::{ + handler::{NotificationsSink, NotifsHandlerProto, NotifsHandlerOut, NotifsHandlerIn}, + upgrade::RegisteredProtocol +}; use bytes::BytesMut; use fnv::FnvHashMap; @@ -31,7 +33,6 @@ use libp2p::swarm::{ }; use log::{debug, error, trace, warn}; use parking_lot::RwLock; -use prometheus_endpoint::HistogramVec; use rand::distributions::{Distribution as _, Uniform}; use smallvec::SmallVec; use std::task::{Context, Poll}; @@ -149,9 +150,6 @@ pub struct GenericProto { /// Events to produce from `poll()`. events: VecDeque>, - - /// If `Some`, report the message queue sizes on this `Histogram`. - queue_size_report: Option, } /// Identifier for a delay firing. @@ -189,7 +187,7 @@ enum PeerState { /// We may still have ongoing traffic with that peer, but it should cease shortly. Disabled { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, + open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>, /// If `Some`, any dial attempts to this peer are delayed until the given `Instant`. banned_until: Option, }, @@ -199,7 +197,7 @@ enum PeerState { /// but should get disconnected in a few seconds. DisabledPendingEnable { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, + open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>, /// When to enable this remote. References an entry in `delays`. timer: DelayId, /// When the `timer` will trigger. @@ -210,7 +208,7 @@ enum PeerState { /// enabled state. Enabled { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, + open: SmallVec<[(ConnectionId, NotificationsSink); crate::MAX_CONNECTIONS_PER_PEER]>, }, /// We received an incoming connection from this peer and forwarded that @@ -227,15 +225,15 @@ impl PeerState { self.get_open().is_some() } - /// Returns the connection ID of the first established connection + /// Returns the [`NotificationsSink`] of the first established connection /// that is open for custom protocol traffic. - fn get_open(&self) -> Option { + fn get_open(&self) -> Option<&NotificationsSink> { match self { PeerState::Disabled { open, .. } | PeerState::DisabledPendingEnable { open, .. } | PeerState::Enabled { open, .. } => if !open.is_empty() { - Some(open[0]) + Some(&open[0].1) } else { None } @@ -284,9 +282,24 @@ pub enum GenericProtoOut { /// Handshake that was sent to us. /// This is normally a "Status" message, but this is out of the concern of this code. received_handshake: Vec, + /// Object that permits sending notifications to the peer. + notifications_sink: NotificationsSink, }, - /// Closed a custom protocol with the remote. + /// The [`NotificationsSink`] object used to send notifications with the given peer must be + /// replaced with a new one. + /// + /// This event is typically emitted when a transport-level connection is closed and we fall + /// back to a secondary connection. + CustomProtocolReplaced { + /// Id of the peer we are connected to. + peer_id: PeerId, + /// Replacement for the previous [`NotificationsSink`]. + notifications_sink: NotificationsSink, + }, + + /// Closed a custom protocol with the remote. The existing [`NotificationsSink`] should + /// be dropped. CustomProtocolClosed { /// Id of the peer we were connected to. peer_id: PeerId, @@ -317,16 +330,12 @@ pub enum GenericProtoOut { impl GenericProto { /// Creates a `CustomProtos`. - /// - /// The `queue_size_report` is an optional Prometheus metric that can report the size of the - /// messages queue. If passed, it must have one label for the protocol name. pub fn new( local_peer_id: PeerId, protocol: impl Into, versions: &[u8], handshake_message: Vec, peerset: sc_peerset::Peerset, - queue_size_report: Option, ) -> Self { let legacy_handshake_message = Arc::new(RwLock::new(handshake_message)); let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message); @@ -342,7 +351,6 @@ impl GenericProto { incoming: SmallVec::new(), next_incoming_index: sc_peerset::IncomingIndex(0), events: VecDeque::new(), - queue_size_report, } } @@ -394,6 +402,15 @@ impl GenericProto { self.peers.get(peer_id).map(|p| p.is_open()).unwrap_or(false) } + /// Returns the [`NotificationsSink`] that sends notifications to the given peer, or `None` + /// if the custom protocols aren't opened with this peer. + /// + /// If [`GenericProto::is_open`] returns `true` for this `PeerId`, then this method is + /// guaranteed to return `Some`. + pub fn notifications_sink(&self, peer_id: &PeerId) -> Option<&NotificationsSink> { + self.peers.get(peer_id).and_then(|p| p.get_open()) + } + /// Disconnects the given peer if we are connected to it. pub fn disconnect_peer(&mut self, peer_id: &PeerId) { debug!(target: "sub-libp2p", "External API => Disconnect {:?}", peer_id); @@ -538,14 +555,14 @@ impl GenericProto { message: impl Into>, encoded_fallback_message: Vec, ) { - let conn = match self.peers.get(target).and_then(|p| p.get_open()) { + let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { None => { debug!(target: "sub-libp2p", "Tried to sent notification to {:?} without an open channel.", target); return }, - Some(conn) => conn + Some(sink) => sink }; trace!( @@ -555,16 +572,11 @@ impl GenericProto { str::from_utf8(&protocol_name) ); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: target.clone(), - handler: NotifyHandler::One(conn), - event: NotifsHandlerIn::SendNotification { - message: message.into(), - encoded_fallback_message, - protocol_name, - }, - }); + notifs_sink.send_sync_notification( + &protocol_name, + encoded_fallback_message, + message + ); } /// Sends a message to a peer. @@ -574,25 +586,19 @@ impl GenericProto { /// Also note that even we have a valid open substream, it may in fact be already closed /// without us knowing, in which case the packet will not be received. pub fn send_packet(&mut self, target: &PeerId, message: Vec) { - let conn = match self.peers.get(target).and_then(|p| p.get_open()) { + let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { None => { debug!(target: "sub-libp2p", "Tried to sent packet to {:?} without an open channel.", target); return } - Some(conn) => conn + Some(sink) => sink }; trace!(target: "sub-libp2p", "External API => Packet for {:?}", target); trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: target.clone(), - handler: NotifyHandler::One(conn), - event: NotifsHandlerIn::SendLegacy { - message, - } - }); + notifs_sink.send_legacy(message); } /// Returns the state of the peerset manager, for debugging purposes. @@ -873,7 +879,6 @@ impl NetworkBehaviour for GenericProto { NotifsHandlerProto::new( self.legacy_protocol.clone(), self.notif_protocols.clone(), - self.queue_size_report.clone() ) } @@ -985,15 +990,26 @@ impl NetworkBehaviour for GenericProto { // i.e. there is no connection that is open for custom protocols, // in which case `CustomProtocolClosed` was already emitted. let closed = open.is_empty(); - open.retain(|c| c != conn); - if open.is_empty() && !closed { - debug!(target: "sub-libp2p", "External API <= Closed({})", peer_id); - let event = GenericProtoOut::CustomProtocolClosed { - peer_id: peer_id.clone(), - reason: "Disconnected by libp2p".into(), - }; + let sink_closed = open.get(0).map_or(false, |(c, _)| c == conn); + open.retain(|(c, _)| c != conn); + if !closed { + if let Some((_, sink)) = open.get(0) { + if sink_closed { + let event = GenericProtoOut::CustomProtocolReplaced { + peer_id: peer_id.clone(), + notifications_sink: sink.clone(), + }; + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + } + } else { + debug!(target: "sub-libp2p", "External API <= Closed({})", peer_id); + let event = GenericProtoOut::CustomProtocolClosed { + peer_id: peer_id.clone(), + reason: "Disconnected by libp2p".into(), + }; - self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + } } } _ => {} @@ -1140,9 +1156,11 @@ impl NetworkBehaviour for GenericProto { return }; - let last = match mem::replace(entry.get_mut(), PeerState::Poisoned) { + let (last, new_notifications_sink) = match mem::replace(entry.get_mut(), PeerState::Poisoned) { PeerState::Enabled { mut open } => { - if let Some(pos) = open.iter().position(|c| c == &connection) { + let pos = open.iter().position(|(c, _)| c == &connection); + let sink_closed = pos == Some(0); + if let Some(pos) = pos { open.remove(pos); } else { debug_assert!(false); @@ -1167,16 +1185,24 @@ impl NetworkBehaviour for GenericProto { }); let last = open.is_empty(); + let new_notifications_sink = open.iter().next().and_then(|(_, sink)| + if sink_closed { + Some(sink.clone()) + } else { + None + }); *entry.into_mut() = PeerState::Disabled { open, banned_until: None }; - last + (last, new_notifications_sink) }, PeerState::Disabled { mut open, banned_until } => { - if let Some(pos) = open.iter().position(|c| c == &connection) { + let pos = open.iter().position(|(c, _)| c == &connection); + let sink_closed = pos == Some(0); + if let Some(pos) = pos { open.remove(pos); } else { debug_assert!(false); @@ -1188,18 +1214,28 @@ impl NetworkBehaviour for GenericProto { } let last = open.is_empty(); + let new_notifications_sink = open.iter().next().and_then(|(_, sink)| + if sink_closed { + Some(sink.clone()) + } else { + None + }); + *entry.into_mut() = PeerState::Disabled { open, banned_until }; - last + + (last, new_notifications_sink) }, PeerState::DisabledPendingEnable { mut open, timer, timer_deadline } => { - if let Some(pos) = open.iter().position(|c| c == &connection) { + let pos = open.iter().position(|(c, _)| c == &connection); + let sink_closed = pos == Some(0); + if let Some(pos) = pos { open.remove(pos); } else { debug_assert!(false); @@ -1211,12 +1247,20 @@ impl NetworkBehaviour for GenericProto { } let last = open.is_empty(); + let new_notifications_sink = open.iter().next().and_then(|(_, sink)| + if sink_closed { + Some(sink.clone()) + } else { + None + }); + *entry.into_mut() = PeerState::DisabledPendingEnable { open, timer, timer_deadline }; - last + + (last, new_notifications_sink) }, state => { error!(target: "sub-libp2p", @@ -1233,12 +1277,20 @@ impl NetworkBehaviour for GenericProto { peer_id: source, }; self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + } else { + if let Some(new_notifications_sink) = new_notifications_sink { + let event = GenericProtoOut::CustomProtocolReplaced { + peer_id: source, + notifications_sink: new_notifications_sink, + }; + self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); + } debug!(target: "sub-libp2p", "Secondary connection closed custom protocol."); } } - NotifsHandlerOut::Open { endpoint, received_handshake } => { + NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink } => { debug!(target: "sub-libp2p", "Handler({:?}) => Endpoint {:?} open for custom protocols.", source, endpoint); @@ -1248,8 +1300,8 @@ impl NetworkBehaviour for GenericProto { Some(PeerState::DisabledPendingEnable { ref mut open, .. }) | Some(PeerState::Disabled { ref mut open, .. }) => { let first = open.is_empty(); - if !open.iter().any(|c| *c == connection) { - open.push(connection); + if !open.iter().any(|(c, _)| *c == connection) { + open.push((connection, notifications_sink.clone())); } else { error!( target: "sub-libp2p", @@ -1269,7 +1321,11 @@ impl NetworkBehaviour for GenericProto { if first { debug!(target: "sub-libp2p", "External API <= Open({:?})", source); - let event = GenericProtoOut::CustomProtocolOpen { peer_id: source, received_handshake }; + let event = GenericProtoOut::CustomProtocolOpen { + peer_id: source, + received_handshake, + notifications_sink + }; self.events.push_back(NetworkBehaviourAction::GenerateEvent(event)); } else { diff --git a/substrate/client/network/src/protocol/generic_proto/handler.rs b/substrate/client/network/src/protocol/generic_proto/handler.rs index 3b4469a872..5845130a7d 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler.rs @@ -15,7 +15,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -pub use self::group::{NotifsHandlerProto, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut}; + +pub use self::group::{ + NotificationsSink, NotifsHandlerError, Ready, NotifsHandlerProto, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut +}; pub use self::legacy::ConnectionKillError as LegacyConnectionKillError; mod group; diff --git a/substrate/client/network/src/protocol/generic_proto/handler/group.rs b/substrate/client/network/src/protocol/generic_proto/handler/group.rs index 3403f7dd82..2826f7a19c 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/group.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/group.rs @@ -63,11 +63,21 @@ use libp2p::swarm::{ SubstreamProtocol, NegotiatedSubstream, }; +use futures::{ + channel::mpsc, + lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard}, + prelude::* +}; use log::{debug, error}; -use parking_lot::RwLock; -use prometheus_endpoint::HistogramVec; +use parking_lot::{Mutex, RwLock}; use std::{borrow::Cow, error, io, str, sync::Arc, task::{Context, Poll}}; +/// Number of pending notifications in asynchronous contexts. +/// See [`NotificationsSink::reserve_notification`] for context. +const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8; +/// Number of pending notifications in synchronous contexts. +const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048; + /// Implements the `IntoProtocolsHandler` trait of libp2p. /// /// Every time a connection with a remote starts, an instance of this struct is created and @@ -107,6 +117,18 @@ pub struct NotifsHandler { /// we push the corresponding index here and process them when the handler /// gets enabled/disabled. pending_in: Vec, + + /// If `Some`, contains the two `Receiver`s connected to the [`NotificationsSink`] that has + /// been sent out. The notifications to send out can be pulled from this receivers. + /// We use two different channels in order to have two different channel sizes, but from the + /// receiving point of view, the two channels are the same. + /// The receivers are fused in case the user drops the [`NotificationsSink`] entirely. + notifications_sink_rx: Option< + stream::Select< + stream::Fuse>, + stream::Fuse> + > + >, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -140,6 +162,7 @@ impl IntoProtocolsHandler for NotifsHandlerProto { legacy: self.legacy.into_handler(remote_peer_id, connected_point), enabled: EnabledState::Initial, pending_in: Vec::new(), + notifications_sink_rx: None, } } } @@ -152,32 +175,6 @@ pub enum NotifsHandlerIn { /// The node should stop using custom protocols. Disable, - - /// Sends a message through the custom protocol substream. - /// - /// > **Note**: This must **not** be a `ConsensusMessage`, `Transactions`, or - /// > `BlockAnnounce` message. - SendLegacy { - /// The message to send. - message: Vec, - }, - - /// Sends a notifications message. - SendNotification { - /// Name of the protocol for the message. - /// - /// Must match one of the registered protocols. For backwards-compatibility reasons, if - /// the remote doesn't support this protocol, we use the legacy substream. - protocol_name: Cow<'static, [u8]>, - - /// Message to send on the legacy substream if the protocol isn't available. - /// - /// This corresponds to what you would have sent with `SendLegacy`. - encoded_fallback_message: Vec, - - /// The message to send. - message: Vec, - }, } /// Event that can be emitted by a `NotifsHandler`. @@ -190,6 +187,8 @@ pub enum NotifsHandlerOut { /// Handshake that was sent to us. /// This is normally a "Status" message, but this out of the concern of this code. received_handshake: Vec, + /// How notifications can be sent to this node. + notifications_sink: NotificationsSink, }, /// The connection is closed for custom protocols. @@ -227,19 +226,160 @@ pub enum NotifsHandlerOut { }, } +/// Sink connected directly to the node background task. Allows sending notifications to the peer. +/// +/// Can be cloned in order to obtain multiple references to the same peer. +#[derive(Debug, Clone)] +pub struct NotificationsSink { + inner: Arc, +} + +#[derive(Debug)] +struct NotificationsSinkInner { + /// Sender to use in asynchronous contexts. Uses an asynchronous mutex. + async_channel: FuturesMutex>, + /// Sender to use in synchronous contexts. Uses a synchronous mutex. + /// This channel has a large capacity and is meant to be used in contexts where + /// back-pressure cannot be properly exerted. + /// It will be removed in a future version. + sync_channel: Mutex>, +} + +/// Message emitted through the [`NotificationsSink`] and processed by the background task +/// dedicated to the peer. +#[derive(Debug)] +enum NotificationsSinkMessage { + /// Message emitted by [`NotificationsSink::send_legacy`]. + Legacy { + message: Vec, + }, + + /// Message emitted by [`NotificationsSink::reserve_notification`] and + /// [`NotificationsSink::write_notification_now`]. + Notification { + protocol_name: Vec, + encoded_fallback_message: Vec, + message: Vec, + }, + + /// Must close the connection. + ForceClose, +} + +impl NotificationsSink { + /// Sends a message to the peer using the legacy substream. + /// + /// If too many messages are already buffered, the message is silently discarded and the + /// connection to the peer will be closed shortly after. + /// + /// This method will be removed in a future version. + pub fn send_legacy<'a>(&'a self, message: impl Into>) { + let mut lock = self.inner.sync_channel.lock(); + let result = lock.try_send(NotificationsSinkMessage::Legacy { + message: message.into() + }); + + if result.is_err() { + // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the + // buffer, and therefore that `try_send` will succeed. + let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose); + debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected())); + } + } + + /// Sends a notification to the peer. + /// + /// If too many messages are already buffered, the notification is silently discarded and the + /// connection to the peer will be closed shortly after. + /// + /// The protocol name is expected to be checked ahead of calling this method. It is a logic + /// error to send a notification using an unknown protocol. + /// + /// This method will be removed in a future version. + pub fn send_sync_notification<'a>( + &'a self, + protocol_name: &[u8], + encoded_fallback_message: impl Into>, + message: impl Into> + ) { + let mut lock = self.inner.sync_channel.lock(); + let result = lock.try_send(NotificationsSinkMessage::Notification { + protocol_name: protocol_name.to_owned(), + encoded_fallback_message: encoded_fallback_message.into(), + message: message.into() + }); + + if result.is_err() { + // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the + // buffer, and therefore that `try_send` will succeed. + let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose); + debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected())); + } + } + + /// Wait until the remote is ready to accept a notification. + /// + /// Returns an error in the case where the connection is closed. + /// + /// The protocol name is expected to be checked ahead of calling this method. It is a logic + /// error to send a notification using an unknown protocol. + pub async fn reserve_notification<'a>(&'a self, protocol_name: &[u8]) -> Result, ()> { + let mut lock = self.inner.async_channel.lock().await; + + let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await; + if poll_ready.is_ok() { + Ok(Ready { protocol_name: protocol_name.to_owned(), lock }) + } else { + Err(()) + } + } +} + +/// Notification slot is reserved and the notification can actually be sent. +#[must_use] +#[derive(Debug)] +pub struct Ready<'a> { + /// Guarded channel. The channel inside is guaranteed to not be full. + lock: FuturesMutexGuard<'a, mpsc::Sender>, + /// Name of the protocol. Should match one of the protocols passed at initialization. + protocol_name: Vec, +} + +impl<'a> Ready<'a> { + /// Consumes this slots reservation and actually queues the notification. + /// + /// Returns an error if the substream has been closed. + pub fn send( + mut self, + encoded_fallback_message: impl Into>, + notification: impl Into> + ) -> Result<(), ()> { + self.lock.start_send(NotificationsSinkMessage::Notification { + protocol_name: self.protocol_name, + encoded_fallback_message: encoded_fallback_message.into(), + message: notification.into(), + }).map_err(|_| ()) + } +} + +/// Error specific to the collection of protocols. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum NotifsHandlerError { + /// Channel of synchronous notifications is full. + SyncNotificationsClogged, + /// Error in legacy protocol. + Legacy(::Error), +} + impl NotifsHandlerProto { /// Builds a new handler. /// /// `list` is a list of notification protocols names, and the message to send as part of the /// handshake. At the moment, the message is always the same whether we open a substream /// ourselves or respond to handshake from the remote. - /// - /// The `queue_size_report` is an optional Prometheus metric that can report the size of the - /// messages queue. If passed, it must have one label for the protocol name. pub fn new( legacy: RegisteredProtocol, list: impl Into, Arc>>)>>, - queue_size_report: Option ) -> Self { let list = list.into(); @@ -247,16 +387,7 @@ impl NotifsHandlerProto { .clone() .into_iter() .map(|(proto_name, initial_message)| { - let queue_size_report = queue_size_report.as_ref().and_then(|qs| { - if let Ok(utf8) = str::from_utf8(&proto_name) { - Some(qs.with_label_values(&[utf8])) - } else { - log::warn!("Ignoring Prometheus metric because {:?} isn't UTF-8", proto_name); - None - } - }); - - (NotifsOutHandlerProto::new(proto_name, queue_size_report), initial_message) + (NotifsOutHandlerProto::new(proto_name), initial_message) }).collect(); let in_handlers = list.clone() @@ -275,13 +406,7 @@ impl NotifsHandlerProto { impl ProtocolsHandler for NotifsHandler { type InEvent = NotifsHandlerIn; type OutEvent = NotifsHandlerOut; - type Error = EitherError< - EitherError< - ::Error, - ::Error, - >, - ::Error, - >; + type Error = NotifsHandlerError; type InboundProtocol = SelectUpgrade, RegisteredProtocol>; type OutboundProtocol = EitherUpgrade; // Index within the `out_handlers`; None for legacy @@ -363,24 +488,6 @@ impl ProtocolsHandler for NotifsHandler { self.in_handlers[num].0.inject_event(NotifsInHandlerIn::Refuse); } }, - NotifsHandlerIn::SendLegacy { message } => - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }), - NotifsHandlerIn::SendNotification { message, encoded_fallback_message, protocol_name } => { - for (handler, _) in &mut self.out_handlers { - if handler.protocol_name() != &protocol_name[..] { - continue; - } - - if handler.is_open() { - handler.inject_event(NotifsOutHandlerIn::Send(message)); - return; - } - } - - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { - message: encoded_fallback_message, - }); - }, } } @@ -461,6 +568,60 @@ impl ProtocolsHandler for NotifsHandler { ) -> Poll< ProtocolsHandlerEvent > { + if let Some(notifications_sink_rx) = &mut self.notifications_sink_rx { + 'poll_notifs_sink: loop { + // Before we poll the notifications sink receiver, check that all the notification + // channels are ready to send a message. + // TODO: it is planned that in the future we switch to one `NotificationsSink` per + // protocol, in which case each sink should wait only for its corresponding handler + // to be ready, and not all handlers + // see https://github.com/paritytech/substrate/issues/5670 + for (out_handler, _) in &mut self.out_handlers { + match out_handler.poll_ready(cx) { + Poll::Ready(_) => {}, + Poll::Pending => break 'poll_notifs_sink, + } + } + + let message = match notifications_sink_rx.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) | Poll::Pending => break, + }; + + match message { + NotificationsSinkMessage::Legacy { message } => { + self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { + message + }); + } + NotificationsSinkMessage::Notification { + protocol_name, + encoded_fallback_message, + message + } => { + for (handler, _) in &mut self.out_handlers { + if handler.protocol_name() != &protocol_name[..] { + continue; + } + + if handler.is_open() { + handler.send_or_discard(message); + } + + continue 'poll_notifs_sink; + } + + self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { + message: encoded_fallback_message, + }); + } + NotificationsSinkMessage::ForceClose => { + return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)); + } + } + } + } + if let Poll::Ready(ev) = self.legacy.poll(cx) { return match ev { ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol, info: () } => @@ -468,14 +629,37 @@ impl ProtocolsHandler for NotifsHandler { protocol: protocol.map_upgrade(EitherUpgrade::B), info: None, }), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { endpoint, received_handshake, .. }) => + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolOpen { + endpoint, + received_handshake, + .. + }) => { + let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); + let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); + let notifications_sink = NotificationsSink { + inner: Arc::new(NotificationsSinkInner { + async_channel: FuturesMutex::new(async_tx), + sync_channel: Mutex::new(sync_tx), + }), + }; + + debug_assert!(self.notifications_sink_rx.is_none()); + self.notifications_sink_rx = Some(stream::select(async_rx.fuse(), sync_rx.fuse())); + Poll::Ready(ProtocolsHandlerEvent::Custom( - NotifsHandlerOut::Open { endpoint, received_handshake } - )), - ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => + NotifsHandlerOut::Open { endpoint, received_handshake, notifications_sink } + )) + }, + ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomProtocolClosed { endpoint, reason }) => { + // We consciously drop the receivers despite notifications being potentially + // still buffered up. + debug_assert!(self.notifications_sink_rx.is_some()); + self.notifications_sink_rx = None; + Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::Closed { endpoint, reason } - )), + )) + }, ProtocolsHandlerEvent::Custom(LegacyProtoHandlerOut::CustomMessage { message }) => Poll::Ready(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CustomMessage { message } @@ -485,7 +669,7 @@ impl ProtocolsHandler for NotifsHandler { NotifsHandlerOut::ProtocolError { is_severe, error } )), ProtocolsHandlerEvent::Close(err) => - Poll::Ready(ProtocolsHandlerEvent::Close(EitherError::B(err))), + Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), } } diff --git a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs index 6b97ad67e3..14de382c1b 100644 --- a/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs +++ b/substrate/client/network/src/protocol/generic_proto/handler/notif_out.rs @@ -34,8 +34,10 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{debug, warn, error}; -use prometheus_endpoint::Histogram; -use std::{borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll}, time::Duration}; +use std::{ + borrow::Cow, collections::VecDeque, fmt, mem, pin::Pin, task::{Context, Poll, Waker}, + time::Duration +}; use wasm_timer::Instant; /// Maximum duration to open a substream and receive the handshake message. After that, we @@ -56,17 +58,14 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5); pub struct NotifsOutHandlerProto { /// Name of the protocol to negotiate. protocol_name: Cow<'static, [u8]>, - /// Optional Prometheus histogram to report message queue size variations. - queue_size_report: Option, } impl NotifsOutHandlerProto { /// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the /// notifications substream. - pub fn new(protocol_name: impl Into>, queue_size_report: Option) -> Self { + pub fn new(protocol_name: impl Into>) -> Self { NotifsOutHandlerProto { protocol_name: protocol_name.into(), - queue_size_report, } } } @@ -78,14 +77,12 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto { DeniedUpgrade } - fn into_handler(self, peer_id: &PeerId, _: &ConnectedPoint) -> Self::Handler { + fn into_handler(self, _: &PeerId, _: &ConnectedPoint) -> Self::Handler { NotifsOutHandler { protocol_name: self.protocol_name, when_connection_open: Instant::now(), - queue_size_report: self.queue_size_report, state: State::Disabled, events_queue: VecDeque::new(), - peer_id: peer_id.clone(), } } } @@ -108,17 +105,11 @@ pub struct NotifsOutHandler { /// When the connection with the remote has been successfully established. when_connection_open: Instant, - /// Optional prometheus histogram to report message queue sizes variations. - queue_size_report: Option, - /// Queue of events to send to the outside. /// /// This queue must only ever be modified to insert elements at the back, or remove the first /// element. events_queue: VecDeque>, - - /// Who we are connected to. - peer_id: PeerId, } /// Our relationship with the node we're connected to. @@ -153,6 +144,11 @@ enum State { Open { /// Substream that is currently open. substream: NotificationsOutSubstream, + /// Waker for the last task that got `Poll::Pending` from `poll_ready`, to notify + /// when the open substream closes due to being disabled or encountering an + /// error, i.e. to notify the task as soon as the substream becomes unavailable, + /// without waiting for an underlying I/O task wakeup. + close_waker: Option, /// The initial message that we sent. Necessary if we need to re-open a substream. initial_message: Vec, }, @@ -173,11 +169,6 @@ pub enum NotifsOutHandlerIn { /// Disables the notifications substream for this node. This is the default state. Disable, - - /// Sends a message on the notifications substream. Ignored if the substream isn't open. - /// - /// It is only valid to send this if the notifications substream has been enabled. - Send(Vec), } /// Event that can be emitted by a `NotifsOutHandler`. @@ -216,6 +207,41 @@ impl NotifsOutHandler { pub fn protocol_name(&self) -> &[u8] { &self.protocol_name } + + /// Polls whether the outbound substream is ready to send a notification. + /// + /// - Returns `Poll::Pending` if the substream is open but not ready to send a notification. + /// - Returns `Poll::Ready(true)` if the substream is ready to send a notification. + /// - Returns `Poll::Ready(false)` if the substream is closed. + /// + pub fn poll_ready(&mut self, cx: &mut Context) -> Poll { + if let State::Open { substream, close_waker, .. } = &mut self.state { + match substream.poll_ready_unpin(cx) { + Poll::Ready(Ok(())) => Poll::Ready(true), + Poll::Ready(Err(_)) => Poll::Ready(false), + Poll::Pending => { + *close_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + } else { + Poll::Ready(false) + } + } + + /// Sends out a notification. + /// + /// If the substream is closed, or not ready to send out a notification yet, then the + /// notification is silently discarded. + /// + /// You are encouraged to call [`NotifsOutHandler::poll_ready`] beforehand to determine + /// whether this will succeed. If `Poll::Ready(true)` is returned, then this method will send + /// out a notification. + pub fn send_or_discard(&mut self, notification: Vec) { + if let State::Open { substream, .. } = &mut self.state { + let _ = substream.start_send_unpin(notification); + } + } } impl ProtocolsHandler for NotifsOutHandler { @@ -247,7 +273,7 @@ impl ProtocolsHandler for NotifsOutHandler { State::Opening { initial_message } => { let ev = NotifsOutHandlerOut::Open { handshake: handshake_msg }; self.events_queue.push_back(ProtocolsHandlerEvent::Custom(ev)); - self.state = State::Open { substream, initial_message }; + self.state = State::Open { substream, initial_message, close_waker: None }; }, // If the handler was disabled while we were negotiating the protocol, immediately // close it. @@ -310,31 +336,15 @@ impl ProtocolsHandler for NotifsOutHandler { } State::Opening { .. } => self.state = State::DisabledOpening, State::Refused => self.state = State::Disabled, - State::Open { substream, .. } => self.state = State::DisabledOpen(substream), + State::Open { substream, close_waker, .. } => { + if let Some(close_waker) = close_waker { + close_waker.wake(); + } + self.state = State::DisabledOpen(substream) + }, State::Poisoned => error!("☎️ Notifications handler in a poisoned state"), } } - - NotifsOutHandlerIn::Send(msg) => - if let State::Open { substream, .. } = &mut self.state { - if substream.push_message(msg).is_err() { - warn!( - target: "sub-libp2p", - "📞 Notifications queue with peer {} is full, dropped message (protocol: {:?})", - self.peer_id, - self.protocol_name, - ); - } - if let Some(metric) = &self.queue_size_report { - metric.observe(substream.queue_len() as f64); - } - } else { - // This is an API misuse. - warn!( - target: "sub-libp2p", - "📞 Tried to send a notification on a disabled handler" - ); - }, } } @@ -375,10 +385,14 @@ impl ProtocolsHandler for NotifsOutHandler { } match &mut self.state { - State::Open { substream, initial_message } => + State::Open { substream, initial_message, close_waker } => match Sink::poll_flush(Pin::new(substream), cx) { Poll::Pending | Poll::Ready(Ok(())) => {}, Poll::Ready(Err(_)) => { + if let Some(close_waker) = close_waker.take() { + close_waker.wake(); + } + // We try to re-open a substream. let initial_message = mem::replace(initial_message, Vec::new()); self.state = State::Opening { initial_message: initial_message.clone() }; diff --git a/substrate/client/network/src/protocol/generic_proto/tests.rs b/substrate/client/network/src/protocol/generic_proto/tests.rs index f932a3a089..cf9f72b89b 100644 --- a/substrate/client/network/src/protocol/generic_proto/tests.rs +++ b/substrate/client/network/src/protocol/generic_proto/tests.rs @@ -83,7 +83,7 @@ fn build_nodes() -> (Swarm, Swarm) { }); let behaviour = CustomProtoWithAddr { - inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset, None), + inner: GenericProto::new(local_peer_id, &b"test"[..], &[1], vec![], peerset), addrs: addrs .iter() .enumerate() @@ -221,9 +221,10 @@ fn two_nodes_transfer_lots_of_packets() { // We spawn two nodes, then make the first one send lots of packets to the second one. The test // ends when the second one has received all of them. - // Note that if we go too high, we will reach the limit to the number of simultaneous - // substreams allowed by the multiplexer. - const NUM_PACKETS: u32 = 5000; + // This test consists in transferring this given number of packets. Considering that (by + // design) the connection gets closed if one of the remotes can't follow the pace, this number + // should not exceed the size of the buffer of pending notifications. + const NUM_PACKETS: u32 = 512; let (mut service1, mut service2) = build_nodes(); diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs index ce2d1934c0..dd02d7e266 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -174,7 +174,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin { } // Indicating that the remote is clogged if that's the case. - if self.send_queue.len() >= 2048 { + if self.send_queue.len() >= 1536 { if !self.clogged_fuse { // Note: this fuse is important not just for preventing us from flooding the logs; // if you remove the fuse, then we will always return early from this function and diff --git a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs index efcd0a4c8f..f1f41d5bcc 100644 --- a/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/substrate/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -34,17 +34,15 @@ /// use bytes::BytesMut; -use futures::{prelude::*, ready}; +use futures::prelude::*; use futures_codec::Framed; use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade}; use log::error; -use std::{borrow::Cow, collections::VecDeque, convert::TryFrom as _, io, iter, mem, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, io, iter, mem, pin::Pin, task::{Context, Poll}}; use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. const MAX_HANDSHAKE_SIZE: usize = 1024; -/// Maximum number of buffered messages before we refuse to accept more. -const MAX_PENDING_MESSAGES: usize = 512; /// Upgrade that accepts a substream, sends back a status message, then becomes a unidirectional /// stream of messages. @@ -93,10 +91,6 @@ pub struct NotificationsOutSubstream { /// Substream where to send messages. #[pin] socket: Framed>>>, - /// Queue of messages waiting to be sent. - messages_queue: VecDeque>, - /// If true, we need to flush `socket`. - need_flush: bool, } impl NotificationsIn { @@ -272,80 +266,38 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, Ok((handshake, NotificationsOutSubstream { socket: Framed::new(socket, UviBytes::default()), - messages_queue: VecDeque::with_capacity(MAX_PENDING_MESSAGES), - need_flush: false, })) }) } } -impl NotificationsOutSubstream { - /// Returns the number of items in the queue, capped to `u32::max_value()`. - pub fn queue_len(&self) -> u32 { - u32::try_from(self.messages_queue.len()).unwrap_or(u32::max_value()) - } - - /// Push a message to the queue of messages. - /// - /// This has the same effect as the `Sink::start_send` implementation. - pub fn push_message(&mut self, item: Vec) -> Result<(), NotificationsOutError> { - if self.messages_queue.len() >= MAX_PENDING_MESSAGES { - return Err(NotificationsOutError::Clogged); - } - - self.messages_queue.push_back(item); - Ok(()) - } -} - impl Sink> for NotificationsOutSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, { type Error = NotificationsOutError; - fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + Sink::poll_ready(this.socket.as_mut(), cx) + .map_err(NotificationsOutError::Io) } - fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - self.push_message(item) + fn start_send(self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { + let mut this = self.project(); + Sink::start_send(this.socket.as_mut(), io::Cursor::new(item)) + .map_err(NotificationsOutError::Io) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); - - while !this.messages_queue.is_empty() { - match Sink::poll_ready(this.socket.as_mut(), cx) { - Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), - Poll::Ready(Ok(())) => { - let msg = this.messages_queue.pop_front() - .expect("checked for !is_empty above; qed"); - Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg))?; - *this.need_flush = true; - }, - Poll::Pending => return Poll::Pending, - } - } - - if *this.need_flush { - match Sink::poll_flush(this.socket.as_mut(), cx) { - Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), - Poll::Ready(Ok(())) => *this.need_flush = false, - Poll::Pending => return Poll::Pending, - } - } - - Poll::Ready(Ok(())) + Sink::poll_flush(this.socket.as_mut(), cx) + .map_err(NotificationsOutError::Io) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - ready!(Sink::poll_flush(self.as_mut(), cx))?; - let this = self.project(); - match Sink::poll_close(this.socket, cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(From::from(err))), - Poll::Pending => Poll::Pending, - } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + Sink::poll_close(this.socket.as_mut(), cx) + .map_err(NotificationsOutError::Io) } } @@ -386,13 +338,6 @@ impl From for NotificationsHandshakeError { pub enum NotificationsOutError { /// I/O error on the substream. Io(io::Error), - - /// Remote doesn't process our messages quickly enough. - /// - /// > **Note**: This is not necessarily the remote's fault, and could also be caused by the - /// > local node sending data too quickly. Properly doing back-pressure, however, - /// > would require a deep refactoring effort in Substrate as a whole. - Clogged, } #[cfg(test)] @@ -402,7 +347,6 @@ mod tests { use async_std::net::{TcpListener, TcpStream}; use futures::{prelude::*, channel::oneshot}; use libp2p::core::upgrade; - use std::pin::Pin; #[test] fn basic_works() { @@ -582,57 +526,4 @@ mod tests { async_std::task::block_on(client); } - - #[test] - fn buffer_is_full_closes_connection() { - const PROTO_NAME: &'static [u8] = b"/test/proto/1"; - let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); - - let client = async_std::task::spawn(async move { - let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let (handshake, mut substream) = upgrade::apply_outbound( - socket, - NotificationsOut::new(PROTO_NAME, vec![]), - upgrade::Version::V1 - ).await.unwrap(); - - assert!(handshake.is_empty()); - - // Push an item and flush so that the test works. - substream.send(b"hello world".to_vec()).await.unwrap(); - - for _ in 0..32768 { - // Push an item on the sink without flushing until an error happens because the - // buffer is full. - let message = b"hello world!".to_vec(); - if future::poll_fn(|cx| Sink::poll_ready(Pin::new(&mut substream), cx)).await.is_err() { - return Ok(()); - } - if Sink::start_send(Pin::new(&mut substream), message).is_err() { - return Ok(()); - } - } - - Err(()) - }); - - async_std::task::block_on(async move { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); - - let (socket, _) = listener.accept().await.unwrap(); - let (initial_message, mut substream) = upgrade::apply_inbound( - socket, - NotificationsIn::new(PROTO_NAME) - ).await.unwrap(); - - assert!(initial_message.is_empty()); - substream.send_handshake(vec![]); - - // Process one message so that the handshake and all works. - let _ = substream.next().await.unwrap().unwrap(); - - client.await.unwrap(); - }); - } } diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index ea045013e1..c11a620c56 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -38,7 +38,7 @@ use crate::{ }, on_demand_layer::AlwaysBadChecker, light_client_handler, block_requests, finality_requests, - protocol::{self, event::Event, LegacyConnectionKillError, sync::SyncState, PeerInfo, Protocol}, + protocol::{self, event::Event, NotifsHandlerError, LegacyConnectionKillError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol}, transport, ReputationChange, }; use futures::prelude::*; @@ -50,7 +50,8 @@ use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handle use log::{error, info, trace, warn}; use parking_lot::Mutex; use prometheus_endpoint::{ - register, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, + register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts, + PrometheusError, Registry, U64, }; use sc_peerset::PeersetHandle; use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; @@ -61,7 +62,7 @@ use sp_runtime::{ use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use std::{ borrow::{Borrow, Cow}, - collections::HashSet, + collections::{HashMap, HashSet}, fs, marker::PhantomData, num:: NonZeroUsize, @@ -95,6 +96,14 @@ pub struct NetworkService { peerset: PeersetHandle, /// Channel that sends messages to the actual worker. to_worker: TracingUnboundedSender>, + /// For each peer and protocol combination, an object that allows sending notifications to + /// that peer. Updated by the [`NetworkWorker`]. + peers_notifications_sinks: Arc>>, + /// For each legacy gossiping engine ID, the corresponding new protocol name. + protocol_name_by_engine: Mutex>>, + /// Field extracted from the [`Metrics`] struct and necessary to report the + /// notifications-related metrics. + notifications_sizes_metric: Option, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData, @@ -242,7 +251,6 @@ impl NetworkWorker { params.block_announce_validator, params.metrics_registry.as_ref(), boot_node_ids.clone(), - metrics.as_ref().map(|m| m.notifications_queues_size.clone()), )?; // Build the swarm. @@ -342,6 +350,10 @@ impl NetworkWorker { } let external_addresses = Arc::new(Mutex::new(Vec::new())); + let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new())); + let protocol_name_by_engine = Mutex::new({ + params.network_config.notifications_protocols.iter().cloned().collect() + }); let service = Arc::new(NetworkService { bandwidth, @@ -351,6 +363,10 @@ impl NetworkWorker { peerset: peerset_handle, local_peer_id, to_worker, + peers_notifications_sinks: peers_notifications_sinks.clone(), + protocol_name_by_engine, + notifications_sizes_metric: + metrics.as_ref().map(|metrics| metrics.notifications_sizes.clone()), _marker: PhantomData, }); @@ -364,6 +380,7 @@ impl NetworkWorker { from_service, light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, + peers_notifications_sinks, metrics, boot_node_ids, }) @@ -542,8 +559,16 @@ impl NetworkService { &self.local_peer_id } - /// Writes a message on an open notifications channel. Has no effect if the notifications - /// channel with this protocol name is closed. + /// Appends a notification to the buffer of pending outgoing notifications with the given peer. + /// Has no effect if the notifications channel with this protocol name is not open. + /// + /// If the buffer of pending outgoing notifications with that peer is full, the notification + /// is silently dropped and the connection to the remote will start being shut down. This + /// happens if you call this method at a higher rate than the rate at which the peer processes + /// these notifications, or if the available network bandwidth is too low. + /// + /// For this reason, this method is considered soft-deprecated. You are encouraged to use + /// [`NetworkService::notification_sender`] instead. /// /// > **Note**: The reason why this is a no-op in the situation where we have no channel is /// > that we don't guarantee message delivery anyway. Networking issues can cause @@ -551,14 +576,145 @@ impl NetworkService { /// > between the remote voluntarily closing a substream or a network error /// > preventing the message from being delivered. /// - /// The protocol must have been registered with `register_notifications_protocol`. + /// The protocol must have been registered with `register_notifications_protocol` or + /// `NetworkConfiguration::notifications_protocols`. /// pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::WriteNotification { - target, + // We clone the `NotificationsSink` in order to be able to unlock the network-wide + // `peers_notifications_sinks` mutex as soon as possible. + let sink = { + let peers_notifications_sinks = self.peers_notifications_sinks.lock(); + if let Some(sink) = peers_notifications_sinks.get(&(target, engine_id)) { + sink.clone() + } else { + // Notification silently discarded, as documented. + return; + } + }; + + // Used later for the metrics report. + let message_len = message.len(); + + // Determine the wire protocol name corresponding to this `engine_id`. + let protocol_name = self.protocol_name_by_engine.lock().get(&engine_id).cloned(); + if let Some(protocol_name) = protocol_name { + // For backwards-compatibility reason, we have to duplicate the message and pass it + // in the situation where the remote still uses the legacy substream. + let fallback = codec::Encode::encode(&{ + protocol::message::generic::Message::<(), (), (), ()>::Consensus({ + protocol::message::generic::ConsensusMessage { + engine_id, + data: message.clone(), + } + }) + }); + + sink.send_sync_notification(&protocol_name, fallback, message); + } else { + return; + } + + if let Some(notifications_sizes_metric) = self.notifications_sizes_metric.as_ref() { + notifications_sizes_metric + .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) + .observe(message_len as f64); + } + } + + /// Obtains a [`NotificationSender`] for a connected peer, if it exists. + /// + /// A `NotificationSender` is scoped to a particular connection to the peer that holds + /// a receiver. With a `NotificationSender` at hand, sending a notification is done in two steps: + /// + /// 1. [`NotificationSender::ready`] is used to wait for the sender to become ready + /// for another notification, yielding a [`NotificationSenderReady`] token. + /// 2. [`NotificationSenderReady::send`] enqueues the notification for sending. This operation + /// can only fail if the underlying notification substream or connection has suddenly closed. + /// + /// An error is returned either by `notification_sender`, by [`NotificationSender::wait`], + /// or by [`NotificationSenderReady::send`] if there exists no open notifications substream + /// with that combination of peer and protocol, or if the remote has asked to close the + /// notifications substream. If that happens, it is guaranteed that an + /// [`Event::NotificationStreamClosed`] has been generated on the stream returned by + /// [`NetworkService::event_stream`]. + /// + /// If the remote requests to close the notifications substream, all notifications successfully + /// enqueued using [`NotificationSenderReady::send`] will finish being sent out before the + /// substream actually gets closed, but attempting to enqueue more notifications will now + /// return an error. It is however possible for the entire connection to be abruptly closed, + /// in which case enqueued notifications will be lost. + /// + /// The protocol must have been registered with `register_notifications_protocol` or + /// `NetworkConfiguration::notifications_protocols`. + /// + /// # Usage + /// + /// This method returns a struct that allows waiting until there is space available in the + /// buffer of messages towards the given peer. If the peer processes notifications at a slower + /// rate than we send them, this buffer will quickly fill up. + /// + /// As such, you should never do something like this: + /// + /// ```ignore + /// // Do NOT do this + /// for peer in peers { + /// if let Ok(n) = network.notification_sender(peer, ...) { + /// if let Ok(s) = n.ready().await { + /// let _ = s.send(...); + /// } + /// } + /// } + /// ``` + /// + /// Doing so would slow down all peers to the rate of the slowest one. A malicious or + /// malfunctioning peer could intentionally process notifications at a very slow rate. + /// + /// Instead, you are encouraged to maintain your own buffer of notifications on top of the one + /// maintained by `sc-network`, and use `notification_sender` to progressively send out + /// elements from your buffer. If this additional buffer is full (which will happen at some + /// point if the peer is too slow to process notifications), appropriate measures can be taken, + /// such as removing non-critical notifications from the buffer or disconnecting the peer + /// using [`NetworkService::disconnect_peer`]. + /// + /// + /// Notifications Per-peer buffer + /// broadcast +-------> of notifications +--> `notification_sender` +--> Internet + /// ^ (not covered by + /// | sc-network) + /// + + /// Notifications should be dropped + /// if buffer is full + /// + pub fn notification_sender( + &self, + target: PeerId, + engine_id: ConsensusEngineId, + ) -> Result { + // We clone the `NotificationsSink` in order to be able to unlock the network-wide + // `peers_notifications_sinks` mutex as soon as possible. + let sink = { + let peers_notifications_sinks = self.peers_notifications_sinks.lock(); + if let Some(sink) = peers_notifications_sinks.get(&(target, engine_id)) { + sink.clone() + } else { + return Err(NotificationSenderError::Closed); + } + }; + + // Determine the wire protocol name corresponding to this `engine_id`. + let protocol_name = match self.protocol_name_by_engine.lock().get(&engine_id).cloned() { + Some(p) => p, + None => return Err(NotificationSenderError::BadProtocol), + }; + + Ok(NotificationSender { + sink, + protocol_name, engine_id, - message, - }); + notification_size_metric: self.notifications_sizes_metric.as_ref().map(|histogram| { + histogram.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) + }), + }) } /// Returns a stream containing the events that happen on the network. @@ -595,9 +751,11 @@ impl NetworkService { engine_id: ConsensusEngineId, protocol_name: impl Into>, ) { + let protocol_name = protocol_name.into(); + self.protocol_name_by_engine.lock().insert(engine_id, protocol_name.clone()); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, - protocol_name: protocol_name.into(), + protocol_name, }); } @@ -813,6 +971,87 @@ impl NetworkStateInfo for NetworkService } } +/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol. +#[must_use] +pub struct NotificationSender { + sink: NotificationsSink, + + /// Name of the protocol on the wire. + protocol_name: Cow<'static, [u8]>, + + /// Engine ID used for the fallback message. + engine_id: ConsensusEngineId, + + /// Field extracted from the [`Metrics`] struct and necessary to report the + /// notifications-related metrics. + notification_size_metric: Option, +} + +impl NotificationSender { + /// Returns a future that resolves when the `NotificationSender` is ready to send a notification. + pub async fn ready<'a>(&'a self) -> Result, NotificationSenderError> { + Ok(NotificationSenderReady { + ready: match self.sink.reserve_notification(&self.protocol_name).await { + Ok(r) => r, + Err(()) => return Err(NotificationSenderError::Closed), + }, + engine_id: self.engine_id, + notification_size_metric: self.notification_size_metric.clone(), + }) + } +} + +/// Reserved slot in the notifications buffer, ready to accept data. +#[must_use] +pub struct NotificationSenderReady<'a> { + ready: Ready<'a>, + + /// Engine ID used for the fallback message. + engine_id: ConsensusEngineId, + + /// Field extracted from the [`Metrics`] struct and necessary to report the + /// notifications-related metrics. + notification_size_metric: Option, +} + +impl<'a> NotificationSenderReady<'a> { + /// Consumes this slots reservation and actually queues the notification. + pub fn send(self, notification: impl Into>) -> Result<(), NotificationSenderError> { + let notification = notification.into(); + + if let Some(notification_size_metric) = &self.notification_size_metric { + notification_size_metric.observe(notification.len() as f64); + } + + // For backwards-compatibility reason, we have to duplicate the message and pass it + // in the situation where the remote still uses the legacy substream. + let fallback = codec::Encode::encode(&{ + protocol::message::generic::Message::<(), (), (), ()>::Consensus({ + protocol::message::generic::ConsensusMessage { + engine_id: self.engine_id, + data: notification.clone(), + } + }) + }); + + self.ready.send(fallback, notification) + .map_err(|()| NotificationSenderError::Closed) + } +} + +/// Error returned by [`NetworkService::send_notification`]. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum NotificationSenderError { + /// The notification receiver has been closed, usually because the underlying connection closed. + /// + /// Some of the notifications most recently sent may not have been received. However, + /// the peer may still be connected and a new `NotificationSender` for the same + /// protocol obtained from [`NetworkService::notification_sender`]. + Closed, + /// Protocol name hasn't been registered. + BadProtocol, +} + /// Messages sent from the `NetworkService` to the `NetworkWorker`. /// /// Each entry corresponds to a method of `NetworkService`. @@ -826,11 +1065,6 @@ enum ServiceToWorkerMsg { AddKnownAddress(PeerId, Multiaddr), SyncFork(Vec, B::Hash, NumberFor), EventStream(out_events::Sender), - WriteNotification { - message: Vec, - engine_id: ConsensusEngineId, - target: PeerId, - }, RegisterNotifProtocol { engine_id: ConsensusEngineId, protocol_name: Cow<'static, [u8]>, @@ -867,6 +1101,9 @@ pub struct NetworkWorker { metrics: Option, /// The `PeerId`'s of all boot nodes. boot_node_ids: Arc>, + /// For each peer and protocol combination, an object that allows sending notifications to + /// that peer. Shared with the [`NetworkService`]. + peers_notifications_sinks: Arc>>, } struct Metrics { @@ -889,7 +1126,6 @@ struct Metrics { listeners_local_addresses: Gauge, listeners_errors_total: Counter, network_per_sec_bytes: GaugeVec, - notifications_queues_size: HistogramVec, notifications_sizes: HistogramVec, notifications_streams_closed_total: CounterVec, notifications_streams_opened_total: CounterVec, @@ -1002,16 +1238,6 @@ impl Metrics { ), &["direction"] )?, registry)?, - notifications_queues_size: register(HistogramVec::new( - HistogramOpts { - common_opts: Opts::new( - "sub_libp2p_notifications_queues_size", - "Total size of all the notification queues" - ), - buckets: vec![0.0, 1.0, 2.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 511.0, 512.0], - }, - &["protocol"] - )?, registry)?, notifications_sizes: register(HistogramVec::new( HistogramOpts { common_opts: Opts::new( @@ -1088,27 +1314,6 @@ impl Metrics { )?, registry)?, }) } - - fn update_with_network_event(&self, event: &Event) { - match event { - Event::NotificationStreamOpened { engine_id, .. } => { - self.notifications_streams_opened_total - .with_label_values(&[&maybe_utf8_bytes_to_string(engine_id)]).inc(); - }, - Event::NotificationStreamClosed { engine_id, .. } => { - self.notifications_streams_closed_total - .with_label_values(&[&maybe_utf8_bytes_to_string(engine_id)]).inc(); - }, - Event::NotificationsReceived { messages, .. } => { - for (engine_id, message) in messages { - self.notifications_sizes - .with_label_values(&["in", &maybe_utf8_bytes_to_string(engine_id)]) - .observe(message.len() as f64); - } - }, - _ => {} - } - } } impl Future for NetworkWorker { @@ -1162,14 +1367,6 @@ impl Future for NetworkWorker { this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), - ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.notifications_sizes - .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) - .observe(message.len() as f64); - } - this.network_service.user_protocol_mut().write_notification(target, engine_id, message) - }, ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { this.network_service .register_notifications_protocol(engine_id, protocol_name); @@ -1237,11 +1434,82 @@ impl Future for NetworkWorker { .inc(); } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { remote, engine_id, notifications_sink, role })) => { if let Some(metrics) = this.metrics.as_ref() { - metrics.update_with_network_event(&ev); + metrics.notifications_streams_opened_total + .with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id)]).inc(); } - this.event_streams.send(ev); + { + let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); + peers_notifications_sinks.insert((remote.clone(), engine_id), notifications_sink); + } + this.event_streams.send(Event::NotificationStreamOpened { + remote, + engine_id, + role, + }); + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { remote, engine_id, notifications_sink })) => { + let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); + if let Some(s) = peers_notifications_sinks.get_mut(&(remote, engine_id)) { + *s = notifications_sink; + } else { + log::error!( + target: "sub-libp2p", + "NotificationStreamReplaced for non-existing substream" + ); + } + + // TODO: Notifications might have been lost as a result of the previous + // connection being dropped, and as a result it would be preferable to notify + // the users of this fact by simulating the substream being closed then + // reopened. + // The code below doesn't compile because `role` is unknown. Propagating the + // handshake of the secondary connections is quite an invasive change and + // would conflict with https://github.com/paritytech/substrate/issues/6403. + // Considering that dropping notifications is generally regarded as + // acceptable, this bug is at the moment intentionally left there and is + // intended to be fixed at the same time as + // https://github.com/paritytech/substrate/issues/6403. + /*this.event_streams.send(Event::NotificationStreamClosed { + remote, + engine_id, + }); + this.event_streams.send(Event::NotificationStreamOpened { + remote, + engine_id, + role, + });*/ + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, engine_id })) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.notifications_streams_closed_total + .with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id[..])]).inc(); + } + this.event_streams.send(Event::NotificationStreamClosed { + remote: remote.clone(), + engine_id, + }); + { + let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); + peers_notifications_sinks.remove(&(remote.clone(), engine_id)); + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages })) => { + if let Some(metrics) = this.metrics.as_ref() { + for (engine_id, message) in &messages { + metrics.notifications_sizes + .with_label_values(&["in", &maybe_utf8_bytes_to_string(engine_id)]) + .observe(message.len() as f64); + } + } + this.event_streams.send(Event::NotificationsReceived { + remote, + messages, + }); + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(ev))) => { + this.event_streams.send(Event::Dht(ev)); }, Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, num_established }) => { trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); @@ -1272,7 +1540,10 @@ impl Future for NetworkWorker { EitherError::A(PingFailure::Timeout)))))))) => "ping-timeout", ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( EitherError::A(EitherError::A(EitherError::A( - EitherError::B(LegacyConnectionKillError)))))))) => "force-closed", + NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))) => "force-closed", + ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( + EitherError::A(EitherError::A(EitherError::A( + NotifsHandlerError::SyncNotificationsClogged))))))) => "sync-notifications-clogged", ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) => "protocol-error", ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout) => "keep-alive-timeout", }; diff --git a/substrate/client/network/src/service/tests.rs b/substrate/client/network/src/service/tests.rs index 0b02153d3d..f0982e30d9 100644 --- a/substrate/client/network/src/service/tests.rs +++ b/substrate/client/network/src/service/tests.rs @@ -345,6 +345,57 @@ fn lots_of_incoming_peers_works() { }); } +#[test] +fn notifications_back_pressure() { + // Node 1 floods node 2 with notifications. Random sleeps are done on node 2 to simulate the + // node being busy. We make sure that all notifications are received. + + const TOTAL_NOTIFS: usize = 10_000; + + let (node1, mut events_stream1, node2, mut events_stream2) = build_nodes_one_proto(); + let node2_id = node2.local_peer_id(); + + let receiver = async_std::task::spawn(async move { + let mut received_notifications = 0; + + while received_notifications < TOTAL_NOTIFS { + match events_stream2.next().await.unwrap() { + Event::NotificationStreamClosed { .. } => panic!(), + Event::NotificationsReceived { messages, .. } => { + for message in messages { + assert_eq!(message.0, ENGINE_ID); + assert_eq!(message.1, format!("hello #{}", received_notifications)); + received_notifications += 1; + } + } + _ => {} + }; + + if rand::random::() < 2 { + async_std::task::sleep(Duration::from_millis(rand::random::() % 750)).await; + } + } + }); + + async_std::task::block_on(async move { + // Wait for the `NotificationStreamOpened`. + loop { + match events_stream1.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => break, + _ => {} + }; + } + + // Sending! + for num in 0..TOTAL_NOTIFS { + let notif = node1.notification_sender(node2_id.clone(), ENGINE_ID).unwrap(); + notif.ready().await.unwrap().send(format!("hello #{}", num)).unwrap(); + } + + receiver.await; + }); +} + #[test] #[should_panic(expected = "don't match the transport")] fn ensure_listen_addresses_consistent_with_transport_memory() {