Rework the event system of sc-network (#1370)

This commit introduces a new concept called `NotificationService` which
allows Polkadot protocols to communicate with the underlying
notification protocol implementation directly, without routing events
through `NetworkWorker`. This implies that each protocol has its own
service which it uses to communicate with remote peers and that each
`NotificationService` is unique with respect to the underlying
notification protocol, meaning `NotificationService` for the transaction
protocol can only be used to send and receive transaction-related
notifications.

The `NotificationService` concept introduces two additional benefits:
  * allow protocols to start using custom handshakes
  * allow protocols to accept/reject inbound peers

Previously the validation of inbound connections was solely the
responsibility of `ProtocolController`. This caused issues with light
peers and `SyncingEngine` as `ProtocolController` would accept more
peers than `SyncingEngine` could accept which caused peers to have
differing views of their own states. `SyncingEngine` would reject excess
peers but these rejections were not properly communicated to those peers
causing them to assume that they were accepted.

With `NotificationService`, the local handshake is not sent to remote
peer if peer is rejected which allows it to detect that it was rejected.

This commit also deprecates the use of `NetworkEventStream` for all
notification-related events and going forward only DHT events are
provided through `NetworkEventStream`. If protocols wish to follow each
other's events, they must introduce additional abtractions, as is done
for GRANDPA and transactions protocols by following the syncing protocol
through `SyncEventStream`.

Fixes https://github.com/paritytech/polkadot-sdk/issues/512
Fixes https://github.com/paritytech/polkadot-sdk/issues/514
Fixes https://github.com/paritytech/polkadot-sdk/issues/515
Fixes https://github.com/paritytech/polkadot-sdk/issues/554
Fixes https://github.com/paritytech/polkadot-sdk/issues/556

---
These changes are transferred from
https://github.com/paritytech/substrate/pull/14197 but there are no
functional changes compared to that PR

---------

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
This commit is contained in:
Aaro Altonen
2023-11-28 20:18:52 +02:00
committed by GitHub
parent ec3a61ed86
commit e71c484d5b
102 changed files with 5694 additions and 2603 deletions
+23 -32
View File
@@ -22,12 +22,13 @@ use crate::{
peer_info,
peer_store::PeerStoreHandle,
protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
protocol_controller::SetId,
request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
service::traits::Direction,
types::ProtocolName,
ReputationChange,
};
use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{
core::Multiaddr, identify::Info as IdentifyInfo, identity::PublicKey, kad::RecordKey,
@@ -35,7 +36,6 @@ use libp2p::{
};
use parking_lot::Mutex;
use sc_network_common::role::{ObservedRole, Roles};
use sp_runtime::traits::Block as BlockT;
use std::{collections::HashSet, sync::Arc, time::Duration};
@@ -97,8 +97,10 @@ pub enum BehaviourOut {
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
protocol: ProtocolName,
/// Set ID.
set_id: SetId,
/// Direction of the stream.
direction: Direction,
/// If the negotiation didn't use the main name of the protocol (the one in
/// `notifications_protocol`), then this field contains which name has actually been
/// used.
@@ -106,8 +108,6 @@ pub enum BehaviourOut {
negotiated_fallback: Option<ProtocolName>,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
/// Role of the remote.
role: ObservedRole,
/// Received handshake.
received_handshake: Vec<u8>,
},
@@ -120,8 +120,8 @@ pub enum BehaviourOut {
NotificationStreamReplaced {
/// Id of the peer we are connected to.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
protocol: ProtocolName,
/// Set ID.
set_id: SetId,
/// Replacement for the previous [`NotificationsSink`].
notifications_sink: NotificationsSink,
},
@@ -131,16 +131,18 @@ pub enum BehaviourOut {
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
protocol: ProtocolName,
/// Set ID.
set_id: SetId,
},
/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Set ID.
set_id: SetId,
/// Concerned protocol and associated message.
messages: Vec<(ProtocolName, Bytes)>,
notification: Vec<u8>,
},
/// We have obtained identity information from a peer, including the addresses it is listening
@@ -272,44 +274,33 @@ impl<B: BlockT> Behaviour<B> {
}
}
fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole {
if roles.is_authority() {
ObservedRole::Authority
} else if roles.is_full() {
ObservedRole::Full
} else {
ObservedRole::Light
}
}
impl From<CustomMessageOutcome> for BehaviourOut {
fn from(event: CustomMessageOutcome) -> Self {
match event {
CustomMessageOutcome::NotificationStreamOpened {
remote,
protocol,
set_id,
direction,
negotiated_fallback,
roles,
received_handshake,
notifications_sink,
} => BehaviourOut::NotificationStreamOpened {
remote,
protocol,
set_id,
direction,
negotiated_fallback,
role: reported_roles_to_observed_role(roles),
received_handshake,
notifications_sink,
},
CustomMessageOutcome::NotificationStreamReplaced {
remote,
protocol,
set_id,
notifications_sink,
} => BehaviourOut::NotificationStreamReplaced { remote, protocol, notifications_sink },
CustomMessageOutcome::NotificationStreamClosed { remote, protocol } =>
BehaviourOut::NotificationStreamClosed { remote, protocol },
CustomMessageOutcome::NotificationsReceived { remote, messages } =>
BehaviourOut::NotificationsReceived { remote, messages },
CustomMessageOutcome::None => BehaviourOut::None,
} => BehaviourOut::NotificationStreamReplaced { remote, set_id, notifications_sink },
CustomMessageOutcome::NotificationStreamClosed { remote, set_id } =>
BehaviourOut::NotificationStreamClosed { remote, set_id },
CustomMessageOutcome::NotificationsReceived { remote, set_id, notification } =>
BehaviourOut::NotificationsReceived { remote, set_id, notification },
}
}
}
+67 -23
View File
@@ -23,10 +23,11 @@
pub use crate::{
discovery::DEFAULT_KADEMLIA_REPLICATION_FACTOR,
protocol::NotificationsSink,
protocol::{notification_service, NotificationsSink, ProtocolHandlePair},
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
service::traits::NotificationService,
types::ProtocolName,
};
@@ -47,7 +48,6 @@ pub use sc_network_common::{
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_runtime::traits::Block as BlockT;
use std::{
@@ -454,14 +454,14 @@ impl Default for SetConfig {
///
/// > **Note**: As new fields might be added in the future, please consider using the `new` method
/// > and modifiers instead of creating this struct manually.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct NonDefaultSetConfig {
/// Name of the notifications protocols of this set. A substream on this set will be
/// considered established once this protocol is open.
///
/// > **Note**: This field isn't present for the default set, as this is handled internally
/// > by the networking code.
pub notifications_protocol: ProtocolName,
protocol_name: ProtocolName,
/// If the remote reports that it doesn't support the protocol indicated in the
/// `notifications_protocol` field, then each of these fallback names will be tried one by
@@ -469,37 +469,84 @@ pub struct NonDefaultSetConfig {
///
/// If a fallback is used, it will be reported in
/// `sc_network::protocol::event::Event::NotificationStreamOpened::negotiated_fallback`
pub fallback_names: Vec<ProtocolName>,
fallback_names: Vec<ProtocolName>,
/// Handshake of the protocol
///
/// NOTE: Currently custom handshakes are not fully supported. See issue #5685 for more
/// details. This field is temporarily used to allow moving the hardcoded block announcement
/// protocol out of `protocol.rs`.
pub handshake: Option<NotificationHandshake>,
handshake: Option<NotificationHandshake>,
/// Maximum allowed size of single notifications.
pub max_notification_size: u64,
max_notification_size: u64,
/// Base configuration.
pub set_config: SetConfig,
set_config: SetConfig,
/// Notification handle.
///
/// Notification handle is created during `NonDefaultSetConfig` creation and its other half,
/// `Box<dyn NotificationService>` is given to the protocol created the config and
/// `ProtocolHandle` is given to `Notifications` when it initializes itself. This handle allows
/// `Notifications ` to communicate with the protocol directly without relaying events through
/// `sc-network.`
protocol_handle_pair: ProtocolHandlePair,
}
impl NonDefaultSetConfig {
/// Creates a new [`NonDefaultSetConfig`]. Zero slots and accepts only reserved nodes.
pub fn new(notifications_protocol: ProtocolName, max_notification_size: u64) -> Self {
Self {
notifications_protocol,
max_notification_size,
fallback_names: Vec::new(),
handshake: None,
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
/// Also returns an object which allows the protocol to communicate with `Notifications`.
pub fn new(
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
max_notification_size: u64,
handshake: Option<NotificationHandshake>,
set_config: SetConfig,
) -> (Self, Box<dyn NotificationService>) {
let (protocol_handle_pair, notification_service) =
notification_service(protocol_name.clone());
(
Self {
protocol_name,
max_notification_size,
fallback_names,
handshake,
set_config,
protocol_handle_pair,
},
}
notification_service,
)
}
/// Get reference to protocol name.
pub fn protocol_name(&self) -> &ProtocolName {
&self.protocol_name
}
/// Get reference to fallback protocol names.
pub fn fallback_names(&self) -> impl Iterator<Item = &ProtocolName> {
self.fallback_names.iter()
}
/// Get reference to handshake.
pub fn handshake(&self) -> &Option<NotificationHandshake> {
&self.handshake
}
/// Get maximum notification size.
pub fn max_notification_size(&self) -> u64 {
self.max_notification_size
}
/// Get reference to `SetConfig`.
pub fn set_config(&self) -> &SetConfig {
&self.set_config
}
/// Take `ProtocolHandlePair` from `NonDefaultSetConfig`
pub fn take_protocol_handle(self) -> ProtocolHandlePair {
self.protocol_handle_pair
}
/// Modifies the configuration to allow non-reserved nodes.
@@ -703,9 +750,6 @@ pub struct Params<Block: BlockT> {
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,
/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,
}
/// Full network configuration.
+9
View File
@@ -68,6 +68,15 @@ pub enum Error {
/// Name of the protocol registered multiple times.
protocol: ProtocolName,
},
/// Peer does not exist.
#[error("Peer `{0}` does not exist.")]
PeerDoesntExist(PeerId),
/// Channel closed.
#[error("Channel closed")]
ChannelClosed,
/// Connection closed.
#[error("Connection closed")]
ConnectionClosed,
}
// Make `Debug` use the `Display` implementation.
+2 -47
View File
@@ -19,14 +19,12 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.
use crate::{types::ProtocolName, NotificationsSink};
use crate::types::ProtocolName;
use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{kad::record::Key, PeerId};
use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;
use sc_network_common::role::ObservedRole;
/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
@@ -92,46 +90,3 @@ pub enum Event {
messages: Vec<(ProtocolName, Bytes)>,
},
}
/// Event sent to `SyncingEngine`
// TODO: remove once `NotificationService` is implemented.
pub enum SyncEvent<B: BlockT> {
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// Received handshake.
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Is the connection inbound.
inbound: bool,
/// Channel for reporting accept/reject of the substream.
tx: oneshot::Sender<bool>,
},
/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
},
/// Notification sink was replaced.
NotificationSinkReplaced {
/// Node we closed the substream with.
remote: PeerId,
/// Notification sink.
sink: NotificationsSink,
},
/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<Bytes>,
},
}
+13 -9
View File
@@ -244,7 +244,6 @@
mod behaviour;
mod protocol;
mod service;
#[cfg(test)]
mod mock;
@@ -258,25 +257,30 @@ pub mod peer_info;
pub mod peer_store;
pub mod protocol_controller;
pub mod request_responses;
pub mod service;
pub mod transport;
pub mod types;
pub mod utils;
pub use event::{DhtEvent, Event, SyncEvent};
pub use event::{DhtEvent, Event};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{Config, IfDisconnected, RequestFailure};
pub use sc_network_common::{role::ObservedRole, types::ReputationChange};
pub use sc_network_common::{
role::{ObservedRole, Roles},
types::ReputationChange,
};
pub use service::{
signature::Signature,
traits::{
KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkEventStream, NetworkNotification,
NetworkPeers, NetworkRequest, NetworkSigner, NetworkStateInfo, NetworkStatus,
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady,
KademliaKey, MessageSink, NetworkBlock, NetworkDHTProvider, NetworkEventStream,
NetworkNotification, NetworkPeers, NetworkRequest, NetworkSigner, NetworkStateInfo,
NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest,
NotificationSender as NotificationSenderT, NotificationSenderError,
NotificationSenderReady, NotificationService,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure,
PublicKey,
};
pub use types::ProtocolName;
+9
View File
@@ -20,6 +20,7 @@
use crate::{peer_store::PeerStoreProvider, protocol_controller::ProtocolHandle, ReputationChange};
use libp2p::PeerId;
use sc_network_common::role::ObservedRole;
use std::collections::HashSet;
/// No-op `PeerStore`.
@@ -49,6 +50,14 @@ impl PeerStoreProvider for MockPeerStore {
0
}
fn peer_role(&self, _peer_id: &PeerId) -> Option<ObservedRole> {
None
}
fn set_peer_role(&mut self, _peer_id: &PeerId, _role: ObservedRole) {
unimplemented!();
}
fn outgoing_candidates(&self, _count: usize, _ignored: HashSet<&PeerId>) -> Vec<PeerId> {
unimplemented!()
}
+39 -2
View File
@@ -23,7 +23,7 @@ use libp2p::PeerId;
use log::trace;
use parking_lot::Mutex;
use partial_sort::PartialSort;
use sc_network_common::types::ReputationChange;
use sc_network_common::{role::ObservedRole, types::ReputationChange};
use std::{
cmp::{Ord, Ordering, PartialOrd},
collections::{hash_map::Entry, HashMap, HashSet},
@@ -66,9 +66,15 @@ pub trait PeerStoreProvider: Debug + Send {
/// Adjust peer reputation.
fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange);
/// Set peer role.
fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole);
/// Get peer reputation.
fn peer_reputation(&self, peer_id: &PeerId) -> i32;
/// Get peer role, if available.
fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole>;
/// Get candidates with highest reputations for initiating outgoing connections.
fn outgoing_candidates(&self, count: usize, ignored: HashSet<&PeerId>) -> Vec<PeerId>;
}
@@ -96,10 +102,18 @@ impl PeerStoreProvider for PeerStoreHandle {
self.inner.lock().report_peer(peer_id, change)
}
fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole) {
self.inner.lock().set_peer_role(peer_id, role)
}
fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
self.inner.lock().peer_reputation(peer_id)
}
fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole> {
self.inner.lock().peer_role(peer_id)
}
fn outgoing_candidates(&self, count: usize, ignored: HashSet<&PeerId>) -> Vec<PeerId> {
self.inner.lock().outgoing_candidates(count, ignored)
}
@@ -122,13 +136,19 @@ impl PeerStoreHandle {
#[derive(Debug, Clone, Copy)]
struct PeerInfo {
/// Reputation of the peer.
reputation: i32,
/// Instant when the peer was last updated.
last_updated: Instant,
/// Role of the peer, if known.
role: Option<ObservedRole>,
}
impl Default for PeerInfo {
fn default() -> Self {
Self { reputation: 0, last_updated: Instant::now() }
Self { reputation: 0, last_updated: Instant::now(), role: None }
}
}
@@ -242,10 +262,27 @@ impl PeerStoreInner {
}
}
fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole) {
log::trace!(target: LOG_TARGET, "Set {peer_id} role to {role:?}");
match self.peers.entry(*peer_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().role = Some(role);
},
Entry::Vacant(entry) => {
entry.insert(PeerInfo { role: Some(role), ..Default::default() });
},
}
}
fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
self.peers.get(peer_id).map_or(0, |info| info.reputation)
}
fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole> {
self.peers.get(peer_id).map_or(None, |info| info.role)
}
fn outgoing_candidates(&self, count: usize, ignored: HashSet<&PeerId>) -> Vec<PeerId> {
let mut candidates = self
.peers
+163 -284
View File
@@ -20,12 +20,11 @@ use crate::{
config, error,
peer_store::{PeerStoreHandle, PeerStoreProvider},
protocol_controller::{self, SetId},
service::traits::Direction,
types::ProtocolName,
};
use bytes::Bytes;
use codec::{DecodeAll, Encode};
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
use codec::Encode;
use libp2p::{
core::Endpoint,
swarm::{
@@ -34,24 +33,23 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use log::{debug, error, warn};
use log::warn;
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender};
use codec::DecodeAll;
use prometheus_endpoint::Registry;
use sc_network_common::role::Roles;
use sc_utils::mpsc::TracingUnboundedReceiver;
use sp_runtime::traits::Block as BlockT;
use std::{
collections::{HashMap, HashSet},
future::Future,
iter,
pin::Pin,
task::Poll,
};
use std::{collections::HashSet, iter, task::Poll};
use message::{generic::Message as GenericMessage, Message};
use notifications::{Notifications, NotificationsOut};
pub use notifications::{NotificationsSink, NotifsHandlerError, Ready};
pub(crate) use notifications::ProtocolHandle;
pub use notifications::{
notification_service, NotificationsSink, NotifsHandlerError, ProtocolHandlePair, Ready,
};
mod notifications;
@@ -64,85 +62,93 @@ pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 *
/// Identifier of the peerset for the block announces protocol.
const HARDCODED_PEERSETS_SYNC: SetId = SetId::from(0);
mod rep {
use crate::ReputationChange as Rep;
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
}
type PendingSyncSubstreamValidation =
Pin<Box<dyn Future<Output = Result<(PeerId, Roles), PeerId>> + Send>>;
// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT> {
/// Used to report reputation changes.
peer_store_handle: PeerStoreHandle,
/// Handles opening the unique substream and sending and receiving raw messages.
behaviour: Notifications,
/// List of notifications protocols that have been registered.
notification_protocols: Vec<ProtocolName>,
/// If we receive a new "substream open" event that contains an invalid handshake, we ask the
/// inner layer to force-close the substream. Force-closing the substream will generate a
/// "substream closed" event. This is a problem: since we can't propagate the "substream open"
/// event to the outer layers, we also shouldn't propagate this "substream closed" event. To
/// solve this, an entry is added to this map whenever an invalid handshake is received.
/// Entries are removed when the corresponding "substream closed" is later received.
bad_handshake_substreams: HashSet<(PeerId, SetId)>,
/// Connected peers on sync protocol.
peers: HashMap<PeerId, Roles>,
sync_substream_validations: FuturesUnordered<PendingSyncSubstreamValidation>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
/// Handle to `PeerStore`.
peer_store_handle: PeerStoreHandle,
/// Streams for peers whose handshake couldn't be determined.
bad_handshake_streams: HashSet<PeerId>,
sync_handle: ProtocolHandle,
_marker: std::marker::PhantomData<B>,
}
impl<B: BlockT> Protocol<B> {
/// Create a new instance.
pub fn new(
pub(crate) fn new(
roles: Roles,
registry: &Option<Registry>,
notification_protocols: Vec<config::NonDefaultSetConfig>,
block_announces_protocol: config::NonDefaultSetConfig,
peer_store_handle: PeerStoreHandle,
protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
from_protocol_controllers: TracingUnboundedReceiver<protocol_controller::Message>,
tx: TracingUnboundedSender<crate::event::SyncEvent<B>>,
) -> error::Result<Self> {
let behaviour = {
Notifications::new(
protocol_controller_handles,
from_protocol_controllers,
// NOTE: Block announcement protocol is still very much hardcoded into `Protocol`.
// This protocol must be the first notification protocol given to
// `Notifications`
iter::once(notifications::ProtocolConfig {
name: block_announces_protocol.notifications_protocol.clone(),
fallback_names: block_announces_protocol.fallback_names.clone(),
handshake: block_announces_protocol.handshake.as_ref().unwrap().to_vec(),
max_notification_size: block_announces_protocol.max_notification_size,
})
.chain(notification_protocols.iter().map(|s| notifications::ProtocolConfig {
name: s.notifications_protocol.clone(),
fallback_names: s.fallback_names.clone(),
handshake: s.handshake.as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
max_notification_size: s.max_notification_size,
})),
) -> error::Result<(Self, Vec<ProtocolHandle>)> {
let (behaviour, notification_protocols, handles) = {
let installed_protocols = iter::once(block_announces_protocol.protocol_name().clone())
.chain(notification_protocols.iter().map(|p| p.protocol_name().clone()))
.collect::<Vec<_>>();
// NOTE: Block announcement protocol is still very much hardcoded into
// `Protocol`. This protocol must be the first notification protocol given to
// `Notifications`
let (protocol_configs, handles): (Vec<_>, Vec<_>) = iter::once({
let config = notifications::ProtocolConfig {
name: block_announces_protocol.protocol_name().clone(),
fallback_names: block_announces_protocol.fallback_names().cloned().collect(),
handshake: block_announces_protocol.handshake().as_ref().unwrap().to_vec(),
max_notification_size: block_announces_protocol.max_notification_size(),
};
let (handle, command_stream) =
block_announces_protocol.take_protocol_handle().split();
((config, handle.clone(), command_stream), handle)
})
.chain(notification_protocols.into_iter().map(|s| {
let config = notifications::ProtocolConfig {
name: s.protocol_name().clone(),
fallback_names: s.fallback_names().cloned().collect(),
handshake: s.handshake().as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
max_notification_size: s.max_notification_size(),
};
let (handle, command_stream) = s.take_protocol_handle().split();
((config, handle.clone(), command_stream), handle)
}))
.unzip();
(
Notifications::new(
protocol_controller_handles,
from_protocol_controllers,
registry,
protocol_configs.into_iter(),
),
installed_protocols,
handles,
)
};
let protocol = Self {
peer_store_handle,
behaviour,
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
.chain(notification_protocols.iter().map(|s| s.notifications_protocol.clone()))
.collect(),
bad_handshake_substreams: Default::default(),
peers: HashMap::new(),
sync_substream_validations: FuturesUnordered::new(),
tx,
sync_handle: handles[0].clone(),
peer_store_handle,
notification_protocols,
bad_handshake_streams: HashSet::new(),
// TODO: remove when `BlockAnnouncesHandshake` is moved away from `Protocol`
_marker: Default::default(),
};
Ok(protocol)
Ok((protocol, handles))
}
pub fn num_sync_peers(&self) -> usize {
self.sync_handle.num_peers()
}
/// Returns the list of all the peers we have an open channel to.
@@ -163,21 +169,12 @@ impl<B: BlockT> Protocol<B> {
}
}
/// Returns the number of peers we're connected to on sync protocol.
pub fn num_connected_peers(&self) -> usize {
self.peers.len()
}
/// Set handshake for the notification protocol.
pub fn set_notification_handshake(&mut self, protocol: ProtocolName, handshake: Vec<u8>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.behaviour.set_notif_protocol_handshake(SetId::from(index), handshake);
} else {
error!(
target: "sub-libp2p",
"set_notification_handshake with unknown protocol: {}",
protocol
);
/// Check if role is available for `peer_id` by attempt to decode the handshake to roles and if
/// that fails, check if the role has been registered to `PeerStore`.
fn role_available(&self, peer_id: &PeerId, handshake: &Vec<u8>) -> bool {
match Roles::decode_all(&mut &handshake[..]) {
Ok(_) => true,
Err(_) => self.peer_store_handle.peer_role(&peer_id).is_some(),
}
}
}
@@ -189,25 +186,42 @@ pub enum CustomMessageOutcome {
/// Notification protocols have been opened with a remote.
NotificationStreamOpened {
remote: PeerId,
protocol: ProtocolName,
// protocol: ProtocolName,
set_id: SetId,
/// Direction of the stream.
direction: Direction,
/// See [`crate::Event::NotificationStreamOpened::negotiated_fallback`].
negotiated_fallback: Option<ProtocolName>,
roles: Roles,
/// Received handshake.
received_handshake: Vec<u8>,
/// Notification sink.
notifications_sink: NotificationsSink,
},
/// The [`NotificationsSink`] of some notification protocols need an update.
NotificationStreamReplaced {
// Peer ID.
remote: PeerId,
protocol: ProtocolName,
/// Set ID.
set_id: SetId,
/// New notification sink.
notifications_sink: NotificationsSink,
},
/// Notification protocols have been closed with a remote.
NotificationStreamClosed { remote: PeerId, protocol: ProtocolName },
NotificationStreamClosed {
// Peer ID.
remote: PeerId,
/// Set ID.
set_id: SetId,
},
/// Messages have been received on one or more notifications protocols.
NotificationsReceived { remote: PeerId, messages: Vec<(ProtocolName, Bytes)> },
/// Now connected to a new peer for syncing purposes.
None,
NotificationsReceived {
// Peer ID.
remote: PeerId,
/// Set ID.
set_id: SetId,
/// Received notification.
notification: Vec<u8>,
},
}
impl<B: BlockT> NetworkBehaviour for Protocol<B> {
@@ -274,23 +288,6 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
cx: &mut std::task::Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::OutEvent, THandlerInEvent<Self>>> {
while let Poll::Ready(Some(validation_result)) =
self.sync_substream_validations.poll_next_unpin(cx)
{
match validation_result {
Ok((peer, roles)) => {
self.peers.insert(peer, roles);
},
Err(peer) => {
log::debug!(
target: "sub-libp2p",
"`SyncingEngine` rejected stream"
);
self.behaviour.disconnect_peer(&peer, HARDCODED_PEERSETS_SYNC);
},
}
}
let event = match self.behaviour.poll(cx, params) {
Poll::Pending => return Poll::Pending,
Poll::Ready(ToSwarm::GenerateEvent(ev)) => ev,
@@ -307,204 +304,86 @@ impl<B: BlockT> NetworkBehaviour for Protocol<B> {
NotificationsOut::CustomProtocolOpen {
peer_id,
set_id,
direction,
received_handshake,
notifications_sink,
negotiated_fallback,
inbound,
} => {
// Set number 0 is hardcoded the default set of peers we sync from.
..
} =>
if set_id == HARDCODED_PEERSETS_SYNC {
// `received_handshake` can be either a `Status` message if received from the
// legacy substream ,or a `BlockAnnouncesHandshake` if received from the block
// announces substream.
match <Message<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => {
let roles = handshake.roles;
let handshake = BlockAnnouncesHandshake::<B> {
roles: handshake.roles,
best_number: handshake.best_number,
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
inbound,
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Ok(msg) => {
debug!(
target: "sync",
"Expected Status message from {}, but got {:?}",
peer_id,
msg,
);
self.peer_store_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
},
Err(err) => {
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(
&mut &received_handshake[..],
) {
Ok(handshake) => {
let roles = handshake.roles;
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(
crate::SyncEvent::NotificationStreamOpened {
inbound,
remote: peer_id,
received_handshake: handshake,
sink: notifications_sink,
tx,
},
);
self.sync_substream_validations.push(Box::pin(async move {
match rx.await {
Ok(accepted) =>
if accepted {
Ok((peer_id, roles))
} else {
Err(peer_id)
},
Err(_) => Err(peer_id),
}
}));
CustomMessageOutcome::None
},
Err(err2) => {
log::debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {} & {}",
peer_id,
received_handshake,
err,
err2,
);
self.peer_store_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
},
}
},
}
let _ = self.sync_handle.report_substream_opened(
peer_id,
direction,
received_handshake,
negotiated_fallback,
notifications_sink,
);
None
} else {
match (
Roles::decode_all(&mut &received_handshake[..]),
self.peers.get(&peer_id),
) {
(Ok(roles), _) => CustomMessageOutcome::NotificationStreamOpened {
match self.role_available(&peer_id, &received_handshake) {
true => Some(CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
set_id,
direction,
negotiated_fallback,
roles,
received_handshake,
notifications_sink,
}),
false => {
self.bad_handshake_streams.insert(peer_id);
None
},
(Err(_), Some(roles)) if received_handshake.is_empty() => {
// As a convenience, we allow opening substreams for "external"
// notification protocols with an empty handshake. This fetches the
// roles from the locally-known roles.
// TODO: remove this after https://github.com/paritytech/substrate/issues/5685
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
roles: *roles,
received_handshake,
notifications_sink,
}
},
(Err(err), _) => {
debug!(target: "sync", "Failed to parse remote handshake: {}", err);
self.bad_handshake_substreams.insert((peer_id, set_id));
self.behaviour.disconnect_peer(&peer_id, set_id);
self.peer_store_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
},
}
}
},
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationSinkReplaced {
remote: peer_id,
sink: notifications_sink,
});
CustomMessageOutcome::None
} else {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
notifications_sink,
}
},
NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
if self.bad_handshake_substreams.remove(&(peer_id, set_id)) {
// The substream that has just been closed had been opened with a bad
// handshake. The outer layers have never received an opening event about this
// substream, and consequently shouldn't receive a closing event either.
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationStreamClosed {
remote: peer_id,
});
self.peers.remove(&peer_id);
CustomMessageOutcome::None
NotificationsOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } =>
if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self
.sync_handle
.report_notification_sink_replaced(peer_id, notifications_sink);
None
} else {
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
protocol: self.notification_protocols[usize::from(set_id)].clone(),
}
(!self.bad_handshake_streams.contains(&peer_id)).then_some(
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
set_id,
notifications_sink,
},
)
},
NotificationsOut::CustomProtocolClosed { peer_id, set_id } => {
if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.sync_handle.report_substream_closed(peer_id);
None
} else {
(!self.bad_handshake_streams.remove(&peer_id)).then_some(
CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, set_id },
)
}
},
NotificationsOut::Notification { peer_id, set_id, message } => {
if self.bad_handshake_substreams.contains(&(peer_id, set_id)) {
CustomMessageOutcome::None
} else if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self.tx.unbounded_send(crate::SyncEvent::NotificationsReceived {
remote: peer_id,
messages: vec![message.freeze()],
});
CustomMessageOutcome::None
if set_id == HARDCODED_PEERSETS_SYNC {
let _ = self
.sync_handle
.report_notification_received(peer_id, message.freeze().into());
None
} else {
let protocol_name = self.notification_protocols[usize::from(set_id)].clone();
CustomMessageOutcome::NotificationsReceived {
remote: peer_id,
messages: vec![(protocol_name, message.freeze())],
}
(!self.bad_handshake_streams.contains(&peer_id)).then_some(
CustomMessageOutcome::NotificationsReceived {
remote: peer_id,
set_id,
notification: message.freeze().into(),
},
)
}
},
};
if !matches!(outcome, CustomMessageOutcome::None) {
return Poll::Ready(ToSwarm::GenerateEvent(outcome))
match outcome {
Some(event) => Poll::Ready(ToSwarm::GenerateEvent(event)),
None => {
cx.waker().wake_by_ref();
Poll::Pending
},
}
// This block can only be reached if an event was pulled from the behaviour and that
// resulted in `CustomMessageOutcome::None`. Since there might be another pending
// message from the behaviour, the task is scheduled again.
cx.waker().wake_by_ref();
Poll::Pending
}
}
@@ -29,6 +29,7 @@ use sc_network_common::message::RequestId;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
/// Type alias for using the message type using block type parameters.
#[allow(unused)]
pub type Message<B> = generic::Message<
<B as BlockT>::Header,
<B as BlockT>::Hash,
@@ -22,9 +22,13 @@
pub use self::{
behaviour::{Notifications, NotificationsOut, ProtocolConfig},
handler::{NotificationsSink, NotifsHandlerError, Ready},
service::{notification_service, ProtocolHandlePair},
};
pub(crate) use self::service::ProtocolHandle;
mod behaviour;
mod handler;
mod service;
mod tests;
mod upgrade;
File diff suppressed because it is too large Load Diff
@@ -58,9 +58,12 @@
//! [`NotifsHandlerIn::Open`] has gotten an answer.
use crate::{
protocol::notifications::upgrade::{
NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
UpgradeCollec,
protocol::notifications::{
service::metrics,
upgrade::{
NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
UpgradeCollec,
},
},
types::ProtocolName,
};
@@ -92,7 +95,7 @@ use std::{
/// Number of pending notifications in asynchronous contexts.
/// See [`NotificationsSink::reserve_notification`] for context.
const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
/// Number of pending notifications in synchronous contexts.
const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
@@ -126,11 +129,19 @@ pub struct NotifsHandler {
events_queue: VecDeque<
ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut, NotifsHandlerError>,
>,
/// Metrics.
metrics: Option<Arc<metrics::Metrics>>,
}
impl NotifsHandler {
/// Creates new [`NotifsHandler`].
pub fn new(peer_id: PeerId, endpoint: ConnectedPoint, protocols: Vec<ProtocolConfig>) -> Self {
pub fn new(
peer_id: PeerId,
endpoint: ConnectedPoint,
protocols: Vec<ProtocolConfig>,
metrics: Option<metrics::Metrics>,
) -> Self {
Self {
protocols: protocols
.into_iter()
@@ -148,6 +159,7 @@ impl NotifsHandler {
endpoint,
when_connection_open: Instant::now(),
events_queue: VecDeque::with_capacity(16),
metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
}
}
}
@@ -303,6 +315,8 @@ pub enum NotifsHandlerOut {
OpenDesiredByRemote {
/// Index of the protocol in the list of protocols passed at initialization.
protocol_index: usize,
/// Received handshake.
handshake: Vec<u8>,
},
/// The remote would like the substreams to be closed. Send a [`NotifsHandlerIn::Close`] in
@@ -331,6 +345,36 @@ pub enum NotifsHandlerOut {
#[derive(Debug, Clone)]
pub struct NotificationsSink {
inner: Arc<NotificationsSinkInner>,
metrics: Option<Arc<metrics::Metrics>>,
}
impl NotificationsSink {
/// Create new [`NotificationsSink`].
/// NOTE: only used for testing but must be `pub` as other crates in `client/network` use this.
pub fn new(
peer_id: PeerId,
) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
{
let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
(
NotificationsSink {
inner: Arc::new(NotificationsSinkInner {
peer_id,
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(Some(sync_tx)),
}),
metrics: None,
},
async_rx,
sync_rx,
)
}
/// Get reference to metrics.
pub fn metrics(&self) -> &Option<Arc<metrics::Metrics>> {
&self.metrics
}
}
#[derive(Debug)]
@@ -350,8 +394,8 @@ struct NotificationsSinkInner {
/// Message emitted through the [`NotificationsSink`] and processed by the background task
/// dedicated to the peer.
#[derive(Debug)]
enum NotificationsSinkMessage {
#[derive(Debug, PartialEq, Eq)]
pub enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`].
Notification { message: Vec<u8> },
@@ -379,8 +423,8 @@ impl NotificationsSink {
let mut lock = self.inner.sync_channel.lock();
if let Some(tx) = lock.as_mut() {
let result =
tx.try_send(NotificationsSinkMessage::Notification { message: message.into() });
let message = message.into();
let result = tx.try_send(NotificationsSinkMessage::Notification { message });
if result.is_err() {
// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
@@ -476,7 +520,10 @@ impl ConnectionHandler for NotifsHandler {
match protocol_info.state {
State::Closed { pending_opening } => {
self.events_queue.push_back(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote { protocol_index },
NotifsHandlerOut::OpenDesiredByRemote {
protocol_index,
handshake: in_substream_open.handshake,
},
));
protocol_info.state = State::OpenDesiredByRemote {
@@ -531,6 +578,7 @@ impl ConnectionHandler for NotifsHandler {
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(Some(sync_tx)),
}),
metrics: self.metrics.clone(),
};
self.protocols[protocol_index].state = State::Open {
@@ -881,6 +929,7 @@ pub mod tests {
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(Some(sync_tx)),
}),
metrics: None,
};
let (in_substream, out_substream) = MockSubstream::new();
@@ -1040,6 +1089,7 @@ pub mod tests {
},
peer_id: PeerId::random(),
events_queue: VecDeque::new(),
metrics: None,
}
}
@@ -1545,6 +1595,7 @@ pub mod tests {
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(Some(sync_tx)),
}),
metrics: None,
};
handler.protocols[0].state = State::Open {
@@ -1597,7 +1648,7 @@ pub mod tests {
assert!(std::matches!(
handler.poll(cx),
Poll::Ready(ConnectionHandlerEvent::Custom(
NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 },
NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
))
));
assert!(std::matches!(
@@ -0,0 +1,130 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::types::ProtocolName;
use prometheus_endpoint::{
self as prometheus, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
U64,
};
use std::sync::Arc;
/// Notification metrics.
#[derive(Debug, Clone)]
pub struct Metrics {
// Total number of opened substreams.
pub notifications_streams_opened_total: CounterVec<U64>,
/// Total number of closed substreams.
pub notifications_streams_closed_total: CounterVec<U64>,
/// In/outbound notification sizes.
pub notifications_sizes: HistogramVec,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
notifications_sizes: prometheus::register(
HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"substrate_sub_libp2p_notifications_sizes",
"Sizes of the notifications send to and received from all nodes",
),
buckets: prometheus::exponential_buckets(64.0, 4.0, 8)
.expect("parameters are always valid values; qed"),
},
&["direction", "protocol"],
)?,
registry,
)?,
notifications_streams_closed_total: prometheus::register(
CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_closed_total",
"Total number of notification substreams that have been closed",
),
&["protocol"],
)?,
registry,
)?,
notifications_streams_opened_total: prometheus::register(
CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_opened_total",
"Total number of notification substreams that have been opened",
),
&["protocol"],
)?,
registry,
)?,
})
}
}
/// Register metrics.
pub fn register(registry: &Registry) -> Result<Metrics, PrometheusError> {
Metrics::register(registry)
}
/// Register opened substream to Prometheus.
pub fn register_substream_opened(metrics: &Option<Metrics>, protocol: &ProtocolName) {
if let Some(metrics) = metrics {
metrics.notifications_streams_opened_total.with_label_values(&[&protocol]).inc();
}
}
/// Register closed substream to Prometheus.
pub fn register_substream_closed(metrics: &Option<Metrics>, protocol: &ProtocolName) {
if let Some(metrics) = metrics {
metrics
.notifications_streams_closed_total
.with_label_values(&[&protocol[..]])
.inc();
}
}
/// Register sent notification to Prometheus.
pub fn register_notification_sent(
metrics: &Option<Arc<Metrics>>,
protocol: &ProtocolName,
size: usize,
) {
if let Some(metrics) = metrics {
metrics
.notifications_sizes
.with_label_values(&["out", protocol])
.observe(size as f64);
}
}
/// Register received notification to Prometheus.
pub fn register_notification_received(
metrics: &Option<Metrics>,
protocol: &ProtocolName,
size: usize,
) {
if let Some(metrics) = metrics {
metrics
.notifications_sizes
.with_label_values(&["in", protocol])
.observe(size as f64);
}
}
@@ -0,0 +1,634 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Notification service implementation.
use crate::{
error,
protocol::notifications::handler::NotificationsSink,
service::traits::{
Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
},
types::ProtocolName,
};
use futures::{
stream::{FuturesUnordered, Stream},
StreamExt,
};
use libp2p::PeerId;
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{collections::HashMap, fmt::Debug, sync::Arc};
pub(crate) mod metrics;
#[cfg(test)]
mod tests;
/// Logging target for the file.
const LOG_TARGET: &str = "sub-libp2p";
/// Default command queue size.
const COMMAND_QUEUE_SIZE: usize = 64;
/// Type representing subscribers of a notification protocol.
type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
/// Type represending a distributable message sink.
/// Detached message sink must carry the protocol name for registering metrics.
///
/// See documentation for [`PeerContext`] for more details.
type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
#[async_trait::async_trait]
impl MessageSink for NotificationSink {
/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
fn send_sync_notification(&self, notification: Vec<u8>) {
let sink = self.lock();
metrics::register_notification_sent(&sink.0.metrics(), &sink.1, notification.len());
sink.0.send_sync_notification(notification);
}
/// Send an asynchronous `notification` to the peer associated with this [`MessageSink`],
/// allowing sender to exercise backpressure.
///
/// Returns an error if the peer does not exist.
async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
// notification sink must be cloned because the lock cannot be held across `.await`
// this makes the implementation less efficient but not prohibitively so as the same
// method is also used by `NetworkService` when sending notifications.
let notification_len = notification.len();
let sink = self.lock().clone();
let permit = sink
.0
.reserve_notification()
.await
.map_err(|_| error::Error::ConnectionClosed)?;
permit.send(notification).map_err(|_| error::Error::ChannelClosed).map(|res| {
metrics::register_notification_sent(&sink.0.metrics(), &sink.1, notification_len);
res
})
}
}
/// Inner notification event to deal with `NotificationsSinks` without exposing that
/// implementation detail to [`NotificationService`] consumers.
#[derive(Debug)]
enum InnerNotificationEvent {
/// Validate inbound substream.
ValidateInboundSubstream {
/// Peer ID.
peer: PeerId,
/// Received handshake.
handshake: Vec<u8>,
/// `oneshot::Sender` for sending validation result back to `Notifications`
result_tx: oneshot::Sender<ValidationResult>,
},
/// Notification substream open to `peer`.
NotificationStreamOpened {
/// Peer ID.
peer: PeerId,
/// Direction of the substream.
direction: Direction,
/// Received handshake.
handshake: Vec<u8>,
/// Negotiated fallback.
negotiated_fallback: Option<ProtocolName>,
/// Notification sink.
sink: NotificationsSink,
},
/// Substream was closed.
NotificationStreamClosed {
/// Peer ID.
peer: PeerId,
},
/// Notification was received from the substream.
NotificationReceived {
/// Peer ID.
peer: PeerId,
/// Received notification.
notification: Vec<u8>,
},
/// Notification sink has been replaced.
NotificationSinkReplaced {
/// Peer ID.
peer: PeerId,
/// Notification sink.
sink: NotificationsSink,
},
}
/// Notification commands.
///
/// Sent by the installed protocols to `Notifications` to open/close/modify substreams.
#[derive(Debug)]
pub enum NotificationCommand {
/// Instruct `Notifications` to open a substream to peer.
#[allow(unused)]
OpenSubstream(PeerId),
/// Instruct `Notifications` to close the substream to peer.
#[allow(unused)]
CloseSubstream(PeerId),
/// Set handshake for the notifications protocol.
SetHandshake(Vec<u8>),
}
/// Context assigned to each peer.
///
/// Contains `NotificationsSink` used by [`NotificationService`] to send notifications
/// and an additional, distributable `NotificationsSink` which the protocol may acquire
/// if it wishes to send notifications through `NotificationsSink` directly.
///
/// The distributable `NoticationsSink` is wrapped in an `Arc<Mutex<>>` to allow
/// `NotificationsService` to swap the underlying sink in case it's replaced.
#[derive(Debug, Clone)]
struct PeerContext {
/// Sink for sending notificaitons.
sink: NotificationsSink,
/// Distributable notification sink.
shared_sink: NotificationSink,
}
/// Handle that is passed on to the notifications protocol.
#[derive(Debug)]
pub struct NotificationHandle {
/// Protocol name.
protocol: ProtocolName,
/// TX channel for sending commands to `Notifications`.
tx: mpsc::Sender<NotificationCommand>,
/// RX channel for receiving events from `Notifications`.
rx: TracingUnboundedReceiver<InnerNotificationEvent>,
/// All subscribers of `NotificationEvent`s.
subscribers: Subscribers,
/// Connected peers.
peers: HashMap<PeerId, PeerContext>,
}
impl NotificationHandle {
/// Create new [`NotificationHandle`].
fn new(
protocol: ProtocolName,
tx: mpsc::Sender<NotificationCommand>,
rx: TracingUnboundedReceiver<InnerNotificationEvent>,
subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
) -> Self {
Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
}
}
#[async_trait::async_trait]
impl NotificationService for NotificationHandle {
/// Instruct `Notifications` to open a new substream for `peer`.
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
todo!("support for opening substreams not implemented yet");
}
/// Instruct `Notifications` to close substream for `peer`.
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
}
/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, peer: &PeerId, notification: Vec<u8>) {
if let Some(info) = self.peers.get(&peer) {
metrics::register_notification_sent(
&info.sink.metrics(),
&self.protocol,
notification.len(),
);
let _ = info.sink.send_sync_notification(notification);
}
}
/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
async fn send_async_notification(
&self,
peer: &PeerId,
notification: Vec<u8>,
) -> Result<(), error::Error> {
let notification_len = notification.len();
let sink = &self.peers.get(&peer).ok_or_else(|| error::Error::PeerDoesntExist(*peer))?.sink;
sink.reserve_notification()
.await
.map_err(|_| error::Error::ConnectionClosed)?
.send(notification)
.map_err(|_| error::Error::ChannelClosed)
.map(|res| {
metrics::register_notification_sent(
&sink.metrics(),
&self.protocol,
notification_len,
);
res
})
}
/// Set handshake for the notification protocol replacing the old handshake.
async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
}
/// Non-blocking variant of `set_handshake()` that attempts to update the handshake
/// and returns an error if the channel is blocked.
///
/// Technically the function can return an error if the channel to `Notifications` is closed
/// but that doesn't happen under normal operation.
fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
}
/// Get next event from the `Notifications` event stream.
async fn next_event(&mut self) -> Option<NotificationEvent> {
loop {
match self.rx.next().await? {
InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } =>
return Some(NotificationEvent::ValidateInboundSubstream {
peer,
handshake,
result_tx,
}),
InnerNotificationEvent::NotificationStreamOpened {
peer,
handshake,
negotiated_fallback,
direction,
sink,
} => {
self.peers.insert(
peer,
PeerContext {
sink: sink.clone(),
shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
},
);
return Some(NotificationEvent::NotificationStreamOpened {
peer,
handshake,
direction,
negotiated_fallback,
})
},
InnerNotificationEvent::NotificationStreamClosed { peer } => {
self.peers.remove(&peer);
return Some(NotificationEvent::NotificationStreamClosed { peer })
},
InnerNotificationEvent::NotificationReceived { peer, notification } =>
return Some(NotificationEvent::NotificationReceived { peer, notification }),
InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
match self.peers.get_mut(&peer) {
None => log::error!(
"{}: notification sink replaced for {peer} but peer does not exist",
self.protocol
),
Some(context) => {
context.sink = sink.clone();
*context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
},
}
},
}
}
}
// Clone [`NotificationService`]
fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
let mut subscribers = self.subscribers.lock();
let (event_tx, event_rx) = tracing_unbounded("mpsc-notification-to-protocol", 100_000);
subscribers.push(event_tx);
Ok(Box::new(NotificationHandle {
protocol: self.protocol.clone(),
tx: self.tx.clone(),
rx: event_rx,
peers: self.peers.clone(),
subscribers: self.subscribers.clone(),
}))
}
/// Get protocol name.
fn protocol(&self) -> &ProtocolName {
&self.protocol
}
/// Get message sink of the peer.
fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>> {
match self.peers.get(peer) {
Some(context) => Some(Box::new(context.shared_sink.clone())),
None => None,
}
}
}
/// Channel pair which allows `Notifications` to interact with a protocol.
#[derive(Debug)]
pub struct ProtocolHandlePair {
/// Protocol name.
protocol: ProtocolName,
/// Subscribers of the notification protocol events.
subscribers: Subscribers,
// Receiver for notification commands received from the protocol implementation.
rx: mpsc::Receiver<NotificationCommand>,
}
impl ProtocolHandlePair {
/// Create new [`ProtocolHandlePair`].
fn new(
protocol: ProtocolName,
subscribers: Subscribers,
rx: mpsc::Receiver<NotificationCommand>,
) -> Self {
Self { protocol, subscribers, rx }
}
/// Consume `self` and split [`ProtocolHandlePair`] into a handle which allows it to send events
/// to the protocol and a stream of commands received from the protocol.
pub(crate) fn split(
self,
) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
(
ProtocolHandle::new(self.protocol, self.subscribers),
Box::new(ReceiverStream::new(self.rx)),
)
}
}
/// Handle that is passed on to `Notifications` and allows it to directly communicate
/// with the protocol.
#[derive(Debug, Clone)]
pub(crate) struct ProtocolHandle {
/// Protocol name.
protocol: ProtocolName,
/// Subscribers of the notification protocol.
subscribers: Subscribers,
/// Number of connected peers.
num_peers: usize,
/// Delegate validation to `Peerset`.
delegate_to_peerset: bool,
/// Prometheus metrics.
metrics: Option<metrics::Metrics>,
}
pub(crate) enum ValidationCallResult {
WaitForValidation(oneshot::Receiver<ValidationResult>),
Delegated,
}
impl ProtocolHandle {
/// Create new [`ProtocolHandle`].
fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
}
/// Set metrics.
pub fn set_metrics(&mut self, metrics: Option<metrics::Metrics>) {
self.metrics = metrics;
}
/// Delegate validation to `Peerset`.
///
/// Protocols that do not do any validation themselves and only rely on `Peerset` handling
/// validation can disable protocol-side validation entirely by delegating all validation to
/// `Peerset`.
pub fn delegate_to_peerset(&mut self, delegate: bool) {
self.delegate_to_peerset = delegate;
}
/// Report to the protocol that a substream has been opened and it must be validated by the
/// protocol.
///
/// Return `oneshot::Receiver` which allows `Notifications` to poll for the validation result
/// from protocol.
pub fn report_incoming_substream(
&self,
peer: PeerId,
handshake: Vec<u8>,
) -> Result<ValidationCallResult, ()> {
let subscribers = self.subscribers.lock();
log::trace!(
target: LOG_TARGET,
"{}: report incoming substream for {peer}, handshake {handshake:?}",
self.protocol
);
if self.delegate_to_peerset {
return Ok(ValidationCallResult::Delegated)
}
// if there is only one subscriber, `Notifications` can wait directly on the
// `oneshot::channel()`'s RX half without indirection
if subscribers.len() == 1 {
let (result_tx, rx) = oneshot::channel();
return subscribers[0]
.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
peer,
handshake,
result_tx,
})
.map(|_| ValidationCallResult::WaitForValidation(rx))
.map_err(|_| ())
}
// if there are multiple subscribers, create a task which waits for all of the
// validations to finish and returns the combined result to `Notifications`
let mut results: FuturesUnordered<_> = subscribers
.iter()
.filter_map(|subscriber| {
let (result_tx, rx) = oneshot::channel();
subscriber
.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
peer,
handshake: handshake.clone(),
result_tx,
})
.is_ok()
.then_some(rx)
})
.collect();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
while let Some(event) = results.next().await {
match event {
Err(_) | Ok(ValidationResult::Reject) =>
return tx.send(ValidationResult::Reject),
Ok(ValidationResult::Accept) => {},
}
}
return tx.send(ValidationResult::Accept)
});
Ok(ValidationCallResult::WaitForValidation(rx))
}
/// Report to the protocol that a substream has been opened and that it can now use the handle
/// to send notifications to the remote peer.
pub fn report_substream_opened(
&mut self,
peer: PeerId,
direction: Direction,
handshake: Vec<u8>,
negotiated_fallback: Option<ProtocolName>,
sink: NotificationsSink,
) -> Result<(), ()> {
metrics::register_substream_opened(&self.metrics, &self.protocol);
let mut subscribers = self.subscribers.lock();
log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
subscribers.retain(|subscriber| {
subscriber
.unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
peer,
direction,
handshake: handshake.clone(),
negotiated_fallback: negotiated_fallback.clone(),
sink: sink.clone(),
})
.is_ok()
});
self.num_peers += 1;
Ok(())
}
/// Substream was closed.
pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
metrics::register_substream_closed(&self.metrics, &self.protocol);
let mut subscribers = self.subscribers.lock();
log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
subscribers.retain(|subscriber| {
subscriber
.unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
.is_ok()
});
self.num_peers -= 1;
Ok(())
}
/// Notification was received from the substream.
pub fn report_notification_received(
&mut self,
peer: PeerId,
notification: Vec<u8>,
) -> Result<(), ()> {
metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
let mut subscribers = self.subscribers.lock();
log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
subscribers.retain(|subscriber| {
subscriber
.unbounded_send(InnerNotificationEvent::NotificationReceived {
peer,
notification: notification.clone(),
})
.is_ok()
});
Ok(())
}
/// Notification sink was replaced.
pub fn report_notification_sink_replaced(
&mut self,
peer: PeerId,
sink: NotificationsSink,
) -> Result<(), ()> {
let mut subscribers = self.subscribers.lock();
log::trace!(
target: LOG_TARGET,
"{}: notification sink replaced for {peer:?}",
self.protocol
);
subscribers.retain(|subscriber| {
subscriber
.unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
peer,
sink: sink.clone(),
})
.is_ok()
});
Ok(())
}
/// Get the number of connected peers.
pub fn num_peers(&self) -> usize {
self.num_peers
}
}
/// Create new (protocol, notification) handle pair.
///
/// Handle pair allows `Notifications` and the protocol to communicate with each other directly.
pub fn notification_service(
protocol: ProtocolName,
) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
let (event_tx, event_rx) = tracing_unbounded("mpsc-notification-to-protocol", 100_000);
let subscribers = Arc::new(Mutex::new(vec![event_tx]));
(
ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
)
}
@@ -0,0 +1,839 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::*;
use crate::protocol::notifications::handler::{
NotificationsSinkMessage, ASYNC_NOTIFICATIONS_BUFFER_SIZE,
};
use std::future::Future;
#[tokio::test]
async fn validate_and_accept_substream() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (handle, _stream) = proto.split();
let peer_id = PeerId::random();
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
}
#[tokio::test]
async fn substream_opened() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn send_sync_notification() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] })
);
}
#[tokio::test]
async fn send_async_notification() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] })
);
}
#[tokio::test]
async fn send_sync_notification_to_non_existent_peer() {
let (proto, notif) = notification_service("/proto/1".into());
let (_sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (_handle, _stream) = proto.split();
let peer = PeerId::random();
// as per the original implementation, the call doesn't fail
notif.send_sync_notification(&peer, vec![1, 3, 3, 7])
}
#[tokio::test]
async fn send_async_notification_to_non_existent_peer() {
let (proto, notif) = notification_service("/proto/1".into());
let (_sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (_handle, _stream) = proto.split();
let peer = PeerId::random();
if let Err(error::Error::PeerDoesntExist(peer_id)) =
notif.send_async_notification(&peer, vec![1, 3, 3, 7]).await
{
assert_eq!(peer, peer_id);
} else {
panic!("invalid error received from `send_async_notification()`");
}
}
#[tokio::test]
async fn receive_notification() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
// notification is received
handle.report_notification_received(peer_id, vec![1, 3, 3, 8]).unwrap();
if let Some(NotificationEvent::NotificationReceived { peer, notification }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(notification, vec![1, 3, 3, 8]);
} else {
panic!("invalid event received");
}
}
#[tokio::test]
async fn backpressure_works() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
// fill the message buffer with messages
for i in 0..=ASYNC_NOTIFICATIONS_BUFFER_SIZE {
assert!(futures::poll!(notif.send_async_notification(&peer_id, vec![1, 3, 3, i as u8]))
.is_ready());
}
// try to send one more message and verify that the call blocks
assert!(futures::poll!(notif.send_async_notification(&peer_id, vec![1, 3, 3, 9])).is_pending());
// release one slot from the buffer for new message
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 0] })
);
// verify that a message can be sent
assert!(futures::poll!(notif.send_async_notification(&peer_id, vec![1, 3, 3, 9])).is_ready());
}
#[tokio::test]
async fn peer_disconnects_then_sync_notification_is_sent() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, _, sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
// report that a substream has been closed but don't poll `notif` to receive this
// information
handle.report_substream_closed(peer_id).unwrap();
drop(sync_rx);
// as per documentation, error is not reported but the notification is silently dropped
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 7]);
}
#[tokio::test]
async fn peer_disconnects_then_async_notification_is_sent() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, async_rx, _) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
// report that a substream has been closed but don't poll `notif` to receive this
// information
handle.report_substream_closed(peer_id).unwrap();
drop(async_rx);
// as per documentation, error is not reported but the notification is silently dropped
if let Err(error::Error::ConnectionClosed) =
notif.send_async_notification(&peer_id, vec![1, 3, 3, 7]).await
{
} else {
panic!("invalid state after calling `send_async_notificatio()` on closed connection")
}
}
#[tokio::test]
async fn cloned_service_opening_substream_works() {
let (proto, mut notif1) = notification_service("/proto/1".into());
let (_sink, _async_rx, _) = NotificationsSink::new(PeerId::random());
let (handle, _stream) = proto.split();
let mut notif2 = notif1.clone().unwrap();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(mut result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
// verify that `notif1` gets the event
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif1.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
// verify that because only one listener has thus far send their result, the result is
// pending
assert!(result_rx.try_recv().is_err());
// verify that `notif2` also gets the event
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif2.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
}
#[tokio::test]
async fn cloned_service_one_service_rejects_substream() {
let (proto, mut notif1) = notification_service("/proto/1".into());
let (_sink, _async_rx, _) = NotificationsSink::new(PeerId::random());
let (handle, _stream) = proto.split();
let mut notif2 = notif1.clone().unwrap();
let mut notif3 = notif2.clone().unwrap();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(mut result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
for notif in vec![&mut notif1, &mut notif2] {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
}
// `notif3` has not yet sent their validation result
assert!(result_rx.try_recv().is_err());
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif3.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Reject).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Reject);
}
#[tokio::test]
async fn cloned_service_opening_substream_sending_and_receiving_notifications_work() {
let (proto, mut notif1) = notification_service("/proto/1".into());
let (sink, _, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let mut notif2 = notif1.clone().unwrap();
let mut notif3 = notif1.clone().unwrap();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
// accept the inbound substream for all services
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that then notification stream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
}
// receive a notification from peer and verify all services receive it
handle.report_notification_received(peer_id, vec![1, 3, 3, 8]).unwrap();
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::NotificationReceived { peer, notification }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(notification, vec![1, 3, 3, 8]);
} else {
panic!("invalid event received");
}
}
for (i, notif) in vec![&mut notif1, &mut notif2, &mut notif3].iter().enumerate() {
// send notification from each service and verify peer receives it
notif.send_sync_notification(&peer_id, vec![1, 3, 3, i as u8]);
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, i as u8] })
);
}
// close the substream for peer and verify all services receive the event
handle.report_substream_closed(peer_id).unwrap();
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::NotificationStreamClosed { peer }) = notif.next_event().await
{
assert_eq!(peer_id, peer);
} else {
panic!("invalid event received");
}
}
}
#[tokio::test]
async fn sending_notifications_using_notifications_sink_works() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
// get a copy of the notification sink and send a synchronous notification using.
let sink = notif.message_sink(&peer_id).unwrap();
sink.send_sync_notification(vec![1, 3, 3, 6]);
// send an asynchronous notification using the acquired notifications sink.
let _ = sink.send_async_notification(vec![1, 3, 3, 7]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 6] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 7] }),
);
// send notifications using the stored notification sink as well.
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] }),
);
}
#[test]
fn try_to_get_notifications_sink_for_non_existent_peer() {
let (_proto, notif) = notification_service("/proto/1".into());
assert!(notif.message_sink(&PeerId::random()).is_none());
}
#[tokio::test]
async fn notification_sink_replaced() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (sink, mut async_rx, mut sync_rx) = NotificationsSink::new(PeerId::random());
let (mut handle, _stream) = proto.split();
let peer_id = PeerId::random();
// validate inbound substream
let ValidationCallResult::WaitForValidation(result_rx) =
handle.report_incoming_substream(peer_id, vec![1, 3, 3, 7]).unwrap()
else {
panic!("peerset not enabled");
};
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
panic!("invalid event received");
}
assert_eq!(result_rx.await.unwrap(), ValidationResult::Accept);
// report that a substream has been opened
handle
.report_substream_opened(peer_id, Direction::Inbound, vec![1, 3, 3, 7], None, sink)
.unwrap();
if let Some(NotificationEvent::NotificationStreamOpened {
peer,
negotiated_fallback,
handshake,
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
} else {
panic!("invalid event received");
}
// get a copy of the notification sink and send a synchronous notification using.
let sink = notif.message_sink(&peer_id).unwrap();
sink.send_sync_notification(vec![1, 3, 3, 6]);
// send an asynchronous notification using the acquired notifications sink.
let _ = sink.send_async_notification(vec![1, 3, 3, 7]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 6] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 7] }),
);
// send notifications using the stored notification sink as well.
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] }),
);
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] }),
);
// the initial connection was closed and `Notifications` switched to secondary connection
// and emitted `CustomProtocolReplaced` which informs the local `NotificationService` that
// the notification sink was replaced.
let (new_sink, mut new_async_rx, mut new_sync_rx) = NotificationsSink::new(PeerId::random());
handle.report_notification_sink_replaced(peer_id, new_sink).unwrap();
// drop the old sinks and poll `notif` once to register the sink replacement
drop(sync_rx);
drop(async_rx);
futures::future::poll_fn(|cx| {
let _ = std::pin::Pin::new(&mut notif.next_event()).poll(cx);
std::task::Poll::Ready(())
})
.await;
// verify that using the `NotificationService` API automatically results in using the correct
// sink
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
new_sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] }),
);
assert_eq!(
new_async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] }),
);
// now send two notifications using the acquired message sink and verify that
// it's also updated
sink.send_sync_notification(vec![1, 3, 3, 6]);
// send an asynchronous notification using the acquired notifications sink.
let _ = sink.send_async_notification(vec![1, 3, 3, 7]).await.unwrap();
assert_eq!(
new_sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 6] }),
);
assert_eq!(
new_async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 7] }),
);
}
#[tokio::test]
async fn set_handshake() {
let (proto, mut notif) = notification_service("/proto/1".into());
let (_handle, mut stream) = proto.split();
assert!(notif.try_set_handshake(vec![1, 3, 3, 7]).is_ok());
match stream.next().await {
Some(NotificationCommand::SetHandshake(handshake)) => {
assert_eq!(handshake, vec![1, 3, 3, 7]);
},
_ => panic!("invalid event received"),
}
for _ in 0..COMMAND_QUEUE_SIZE {
assert!(notif.try_set_handshake(vec![1, 3, 3, 7]).is_ok());
}
assert!(notif.try_set_handshake(vec![1, 3, 3, 7]).is_err());
}
@@ -22,6 +22,7 @@ use crate::{
peer_store::PeerStore,
protocol::notifications::{Notifications, NotificationsOut, ProtocolConfig},
protocol_controller::{ProtoSetConfig, ProtocolController, SetId},
service::traits::{NotificationEvent, ValidationResult},
};
use futures::{future::BoxFuture, prelude::*};
@@ -70,6 +71,8 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
.timeout(Duration::from_secs(20))
.boxed();
let (protocol_handle_pair, mut notif_service) =
crate::protocol::notifications::service::notification_service("/foo".into());
let peer_store = PeerStore::new(if index == 0 {
keypairs.iter().skip(1).map(|keypair| keypair.public().to_peer_id()).collect()
} else {
@@ -91,16 +94,22 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
Box::new(peer_store.handle()),
);
let (notif_handle, command_stream) = protocol_handle_pair.split();
let behaviour = CustomProtoWithAddr {
inner: Notifications::new(
vec![controller_handle],
from_controller,
iter::once(ProtocolConfig {
name: "/foo".into(),
fallback_names: Vec::new(),
handshake: Vec::new(),
max_notification_size: 1024 * 1024,
}),
&None,
iter::once((
ProtocolConfig {
name: "/foo".into(),
fallback_names: Vec::new(),
handshake: Vec::new(),
max_notification_size: 1024 * 1024,
},
notif_handle,
command_stream,
)),
),
peer_store_future: peer_store.run().boxed(),
protocol_controller_future: controller.run().boxed(),
@@ -118,6 +127,16 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
};
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.spawn(async move {
loop {
if let NotificationEvent::ValidateInboundSubstream { result_tx, .. } =
notif_service.next_event().await.unwrap()
{
result_tx.send(ValidationResult::Accept).unwrap();
}
}
});
let mut swarm = SwarmBuilder::with_executor(
transport,
behaviour,
@@ -847,6 +847,7 @@ mod tests {
use super::*;
use crate::{peer_store::PeerStoreProvider, ReputationChange};
use libp2p::PeerId;
use sc_network_common::role::ObservedRole;
use sc_utils::mpsc::{tracing_unbounded, TryRecvError};
use std::collections::HashSet;
@@ -858,8 +859,10 @@ mod tests {
fn is_banned(&self, peer_id: &PeerId) -> bool;
fn register_protocol(&self, protocol_handle: ProtocolHandle);
fn report_disconnect(&mut self, peer_id: PeerId);
fn set_peer_role(&mut self, peer_id: &PeerId, role: ObservedRole);
fn report_peer(&mut self, peer_id: PeerId, change: ReputationChange);
fn peer_reputation(&self, peer_id: &PeerId) -> i32;
fn peer_role(&self, peer_id: &PeerId) -> Option<ObservedRole>;
fn outgoing_candidates<'a>(&self, count: usize, ignored: HashSet<&'a PeerId>) -> Vec<PeerId>;
}
}
+66 -152
View File
@@ -54,6 +54,7 @@ use crate::{
ReputationChange,
};
use codec::DecodeAll;
use either::Either;
use futures::{channel::oneshot, prelude::*};
#[allow(deprecated)]
@@ -71,10 +72,13 @@ use libp2p::{
Multiaddr, PeerId,
};
use log::{debug, error, info, trace, warn};
use metrics::{Histogram, HistogramVec, MetricSources, Metrics};
use metrics::{Histogram, MetricSources, Metrics};
use parking_lot::Mutex;
use sc_network_common::ExHashT;
use sc_network_common::{
role::{ObservedRole, Roles},
ExHashT,
};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;
@@ -118,12 +122,6 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
bandwidth: Arc<transport::BandwidthSinks>,
/// Channel that sends messages to the actual worker.
to_worker: TracingUnboundedSender<ServiceToWorkerMsg>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
/// Field extracted from the [`Metrics`] struct and necessary to report the
/// notifications-related metrics.
notifications_sizes_metric: Option<HistogramVec>,
/// Protocol name -> `SetId` mapping for notification protocols. The map never changes after
/// initialization.
notification_protocol_ids: HashMap<ProtocolName, SetId>,
@@ -132,6 +130,8 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
protocol_handles: Vec<protocol_controller::ProtocolHandle>,
/// Shortcut to sync protocol handle (`protocol_handles[0]`).
sync_protocol_handle: protocol_controller::ProtocolHandle,
/// Handle to `PeerStore`.
peer_store_handle: PeerStoreHandle,
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
@@ -199,7 +199,7 @@ where
)?;
for notification_protocol in &notification_protocols {
ensure_addresses_consistent_with_transport(
notification_protocol.set_config.reserved_nodes.iter().map(|x| &x.multiaddr),
notification_protocol.set_config().reserved_nodes.iter().map(|x| &x.multiaddr),
&network_config.transport,
)?;
}
@@ -241,7 +241,7 @@ where
.map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::MAX));
let notifs_max = notification_protocols
.iter()
.map(|cfg| usize::try_from(cfg.max_notification_size).unwrap_or(usize::MAX));
.map(|cfg| usize::try_from(cfg.max_notification_size()).unwrap_or(usize::MAX));
// A "default" max is added to cover all the other protocols: ping, identify,
// kademlia, block announces, and transactions.
@@ -273,7 +273,7 @@ where
// We must prepend a hardcoded default peer set to notification protocols.
let all_peer_sets_iter = iter::once(&network_config.default_peers_set)
.chain(notification_protocols.iter().map(|protocol| &protocol.set_config));
.chain(notification_protocols.iter().map(|protocol| protocol.set_config()));
let (protocol_handles, protocol_controllers): (Vec<_>, Vec<_>) = all_peer_sets_iter
.enumerate()
@@ -312,21 +312,9 @@ where
iter::once(&params.block_announce_config)
.chain(notification_protocols.iter())
.enumerate()
.map(|(index, protocol)| {
(protocol.notifications_protocol.clone(), SetId::from(index))
})
.map(|(index, protocol)| (protocol.protocol_name().clone(), SetId::from(index)))
.collect();
let protocol = Protocol::new(
From::from(&params.role),
notification_protocols.clone(),
params.block_announce_config,
params.peer_store.clone(),
protocol_handles.clone(),
from_protocol_controllers,
params.tx,
)?;
let known_addresses = {
// Collect all reserved nodes and bootnodes addresses.
let mut addresses: Vec<_> = network_config
@@ -336,7 +324,7 @@ where
.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
.chain(notification_protocols.iter().flat_map(|protocol| {
protocol
.set_config
.set_config()
.reserved_nodes
.iter()
.map(|reserved| (reserved.peer_id, reserved.multiaddr.clone()))
@@ -389,6 +377,16 @@ where
let num_connected = Arc::new(AtomicUsize::new(0));
let external_addresses = Arc::new(Mutex::new(HashSet::new()));
let (protocol, notif_protocol_handles) = Protocol::new(
From::from(&params.role),
&params.metrics_registry,
notification_protocols,
params.block_announce_config,
params.peer_store.clone(),
protocol_handles.clone(),
from_protocol_controllers,
)?;
// Build the swarm.
let (mut swarm, bandwidth): (Swarm<Behaviour<B>>, _) = {
let user_agent =
@@ -508,7 +506,6 @@ where
}
let listen_addresses = Arc::new(Mutex::new(HashSet::new()));
let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new()));
let service = Arc::new(NetworkService {
bandwidth,
@@ -518,13 +515,10 @@ where
local_peer_id,
local_identity,
to_worker,
peers_notifications_sinks: peers_notifications_sinks.clone(),
notifications_sizes_metric: metrics
.as_ref()
.map(|metrics| metrics.notifications_sizes.clone()),
notification_protocol_ids,
protocol_handles,
sync_protocol_handle,
peer_store_handle: params.peer_store.clone(),
_marker: PhantomData,
_block: Default::default(),
});
@@ -539,8 +533,8 @@ where
metrics,
boot_node_ids,
reported_invalid_boot_nodes: Default::default(),
peers_notifications_sinks,
peer_store_handle: params.peer_store,
notif_protocol_handles,
_marker: Default::default(),
_block: Default::default(),
})
@@ -567,7 +561,7 @@ where
/// Returns the number of peers we're connected to.
pub fn num_connected_peers(&self) -> usize {
self.network_service.behaviour().user_protocol().num_connected_peers()
self.network_service.behaviour().user_protocol().num_sync_peers()
}
/// Adds an address for a node.
@@ -991,6 +985,16 @@ where
fn sync_num_connected(&self) -> usize {
self.num_connected.load(Ordering::Relaxed)
}
fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
match Roles::decode_all(&mut &handshake[..]) {
Ok(role) => Some(role.into()),
Err(_) => {
log::debug!(target: "sub-libp2p", "handshake doesn't contain peer role: {handshake:?}");
self.peer_store_handle.peer_role(&peer_id)
},
}
}
}
impl<B, H> NetworkEventStream for NetworkService<B, H>
@@ -1010,68 +1014,20 @@ where
B: BlockT + 'static,
H: ExHashT,
{
fn write_notification(&self, target: PeerId, protocol: ProtocolName, message: Vec<u8>) {
// We clone the `NotificationsSink` in order to be able to unlock the network-wide
// `peers_notifications_sinks` mutex as soon as possible.
let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) {
sink.clone()
} else {
// Notification silently discarded, as documented.
debug!(
target: "sub-libp2p",
"Attempted to send notification on missing or closed substream: {}, {:?}",
target, protocol,
);
return
}
};
if let Some(notifications_sizes_metric) = self.notifications_sizes_metric.as_ref() {
notifications_sizes_metric
.with_label_values(&["out", &protocol])
.observe(message.len() as f64);
}
// Sending is communicated to the `NotificationsSink`.
trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?}, {} bytes)",
target, protocol, message.len()
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);
sink.send_sync_notification(message);
fn write_notification(&self, _target: PeerId, _protocol: ProtocolName, _message: Vec<u8>) {
unimplemented!();
}
fn notification_sender(
&self,
target: PeerId,
protocol: ProtocolName,
_target: PeerId,
_protocol: ProtocolName,
) -> Result<Box<dyn NotificationSenderT>, NotificationSenderError> {
// We clone the `NotificationsSink` in order to be able to unlock the network-wide
// `peers_notifications_sinks` mutex as soon as possible.
let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) {
sink.clone()
} else {
return Err(NotificationSenderError::Closed)
}
};
let notification_size_metric = self
.notifications_sizes_metric
.as_ref()
.map(|histogram| histogram.with_label_values(&["out", &protocol]));
Ok(Box::new(NotificationSender { sink, protocol_name: protocol, notification_size_metric }))
unimplemented!();
}
fn set_notification_handshake(&self, protocol: ProtocolName, handshake: Vec<u8>) {
let _ = self
.to_worker
.unbounded_send(ServiceToWorkerMsg::SetNotificationHandshake(protocol, handshake));
fn set_notification_handshake(&self, _protocol: ProtocolName, _handshake: Vec<u8>) {
unimplemented!();
}
}
@@ -1209,7 +1165,6 @@ enum ServiceToWorkerMsg {
pending_response: oneshot::Sender<Result<NetworkState, RequestFailure>>,
},
DisconnectPeer(PeerId, ProtocolName),
SetNotificationHandshake(ProtocolName, Vec<u8>),
}
/// Main network worker. Must be polled in order for the network to advance.
@@ -1239,11 +1194,10 @@ where
boot_node_ids: Arc<HashMap<PeerId, Vec<Multiaddr>>>,
/// Boot nodes that we already have reported as invalid.
reported_invalid_boot_nodes: HashSet<PeerId>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ProtocolName), NotificationsSink>>>,
/// Peer reputation store handle.
peer_store_handle: PeerStoreHandle,
/// Notification protocol handles.
notif_protocol_handles: Vec<protocol::ProtocolHandle>,
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
@@ -1282,8 +1236,7 @@ where
};
// Update the `num_connected` count shared with the `NetworkService`.
let num_connected_peers =
self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers();
let num_connected_peers = self.network_service.behaviour().user_protocol().num_sync_peers();
self.num_connected.store(num_connected_peers, Ordering::Relaxed);
if let Some(metrics) = self.metrics.as_ref() {
@@ -1353,11 +1306,6 @@ where
.behaviour_mut()
.user_protocol_mut()
.disconnect_peer(&who, protocol_name),
ServiceToWorkerMsg::SetNotificationHandshake(protocol, handshake) => self
.network_service
.behaviour_mut()
.user_protocol_mut()
.set_notification_handshake(protocol, handshake),
}
}
@@ -1472,47 +1420,27 @@ where
},
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
remote,
protocol,
set_id,
direction,
negotiated_fallback,
notifications_sink,
role,
received_handshake,
}) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics
.notifications_streams_opened_total
.with_label_values(&[&protocol])
.inc();
}
{
let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
let _previous_value = peers_notifications_sinks
.insert((remote, protocol.clone()), notifications_sink);
debug_assert!(_previous_value.is_none());
}
self.event_streams.send(Event::NotificationStreamOpened {
let _ = self.notif_protocol_handles[usize::from(set_id)].report_substream_opened(
remote,
protocol,
negotiated_fallback,
role,
direction,
received_handshake,
});
negotiated_fallback,
notifications_sink,
);
},
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
remote,
protocol,
set_id,
notifications_sink,
}) => {
let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) {
*s = notifications_sink;
} else {
error!(
target: "sub-libp2p",
"NotificationStreamReplaced for non-existing substream"
);
debug_assert!(false);
}
let _ = self.notif_protocol_handles[usize::from(set_id)]
.report_notification_sink_replaced(remote, notifications_sink);
// TODO: Notifications might have been lost as a result of the previous
// connection being dropped, and as a result it would be preferable to notify
@@ -1535,31 +1463,17 @@ where
// role,
// });
},
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, protocol }) => {
if let Some(metrics) = self.metrics.as_ref() {
metrics
.notifications_streams_closed_total
.with_label_values(&[&protocol[..]])
.inc();
}
self.event_streams
.send(Event::NotificationStreamClosed { remote, protocol: protocol.clone() });
{
let mut peers_notifications_sinks = self.peers_notifications_sinks.lock();
let _previous_value = peers_notifications_sinks.remove(&(remote, protocol));
debug_assert!(_previous_value.is_some());
}
SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, set_id }) => {
let _ = self.notif_protocol_handles[usize::from(set_id)]
.report_substream_closed(remote);
},
SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages }) => {
if let Some(metrics) = self.metrics.as_ref() {
for (protocol, message) in &messages {
metrics
.notifications_sizes
.with_label_values(&["in", protocol])
.observe(message.len() as f64);
}
}
self.event_streams.send(Event::NotificationsReceived { remote, messages });
SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived {
remote,
set_id,
notification,
}) => {
let _ = self.notif_protocol_handles[usize::from(set_id)]
.report_notification_received(remote, notification);
},
SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => {
if let Some(metrics) = self.metrics.as_ref() {
@@ -61,9 +61,6 @@ pub struct Metrics {
pub kbuckets_num_nodes: GaugeVec<U64>,
pub listeners_local_addresses: Gauge<U64>,
pub listeners_errors_total: Counter<U64>,
pub notifications_sizes: HistogramVec,
pub notifications_streams_closed_total: CounterVec<U64>,
pub notifications_streams_opened_total: CounterVec<U64>,
pub peerset_num_discovered: Gauge<U64>,
pub pending_connections: Gauge<U64>,
pub pending_connections_errors_total: CounterVec<U64>,
@@ -153,31 +150,6 @@ impl Metrics {
"substrate_sub_libp2p_listeners_errors_total",
"Total number of non-fatal errors reported by a listener"
)?, registry)?,
notifications_sizes: prometheus::register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"substrate_sub_libp2p_notifications_sizes",
"Sizes of the notifications send to and received from all nodes"
),
buckets: prometheus::exponential_buckets(64.0, 4.0, 8)
.expect("parameters are always valid values; qed"),
},
&["direction", "protocol"]
)?, registry)?,
notifications_streams_closed_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_closed_total",
"Total number of notification substreams that have been closed"
),
&["protocol"]
)?, registry)?,
notifications_streams_opened_total: prometheus::register(CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_opened_total",
"Total number of notification substreams that have been opened"
),
&["protocol"]
)?, registry)?,
peerset_num_discovered: prometheus::register(Gauge::new(
"substrate_sub_libp2p_peerset_num_discovered",
"Number of nodes stored in the peerset manager",
@@ -18,6 +18,8 @@
//
// If you read this, you are very thorough, congratulations.
//! Signature-related code
use libp2p::{
identity::{Keypair, PublicKey},
PeerId,
+204 -1
View File
@@ -18,8 +18,11 @@
//
// If you read this, you are very thorough, congratulations.
//! Traits defined by `sc-network`.
use crate::{
config::MultiaddrWithPeerId,
error,
event::Event,
request_responses::{IfDisconnected, RequestFailure},
service::signature::Signature,
@@ -30,7 +33,9 @@ use crate::{
use futures::{channel::oneshot, Stream};
use libp2p::{Multiaddr, PeerId};
use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc};
use sc_network_common::role::ObservedRole;
use std::{collections::HashSet, fmt::Debug, future::Future, pin::Pin, sync::Arc};
pub use libp2p::{identity::SigningError, kad::record::Key as KademliaKey};
@@ -221,6 +226,14 @@ pub trait NetworkPeers {
/// Returns the number of peers in the sync peer set we're connected to.
fn sync_num_connected(&self) -> usize;
/// Attempt to get peer role.
///
/// Right now the peer role is decoded from the received handshake for all protocols
/// (`/block-announces/1` has other information as well). If the handshake cannot be
/// decoded into a role, the role queried from `PeerStore` and if the role is not stored
/// there either, `None` is returned and the peer should be discarded.
fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole>;
}
// Manual implementation to avoid extra boxing here
@@ -296,6 +309,10 @@ where
fn sync_num_connected(&self) -> usize {
T::sync_num_connected(self)
}
fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
T::peer_role(self, peer_id, handshake)
}
}
/// Provides access to network-level event stream.
@@ -611,3 +628,189 @@ where
T::new_best_block_imported(self, hash, number)
}
}
/// Substream acceptance result.
#[derive(Debug, PartialEq, Eq)]
pub enum ValidationResult {
/// Accept inbound substream.
Accept,
/// Reject inbound substream.
Reject,
}
/// Substream direction.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Direction {
/// Substream opened by the remote node.
Inbound,
/// Substream opened by the local node.
Outbound,
}
impl Direction {
/// Is the direction inbound.
pub fn is_inbound(&self) -> bool {
std::matches!(self, Direction::Inbound)
}
}
/// Events received by the protocol from `Notifications`.
#[derive(Debug)]
pub enum NotificationEvent {
/// Validate inbound substream.
ValidateInboundSubstream {
/// Peer ID.
peer: PeerId,
/// Received handshake.
handshake: Vec<u8>,
/// `oneshot::Sender` for sending validation result back to `Notifications`
result_tx: tokio::sync::oneshot::Sender<ValidationResult>,
},
/// Remote identified by `PeerId` opened a substream and sent `Handshake`.
/// Validate `Handshake` and report status (accept/reject) to `Notifications`.
NotificationStreamOpened {
/// Peer ID.
peer: PeerId,
/// Is the substream inbound or outbound.
direction: Direction,
/// Received handshake.
handshake: Vec<u8>,
/// Negotiated fallback.
negotiated_fallback: Option<ProtocolName>,
},
/// Substream was closed.
NotificationStreamClosed {
/// Peer Id.
peer: PeerId,
},
/// Notification was received from the substream.
NotificationReceived {
/// Peer ID.
peer: PeerId,
/// Received notification.
notification: Vec<u8>,
},
}
/// Notification service
///
/// Defines behaviors that both the protocol implementations and `Notifications` can expect from
/// each other.
///
/// `Notifications` can send two different kinds of information to protocol:
/// * substream-related information
/// * notification-related information
///
/// When an unvalidated, inbound substream is received by `Notifications`, it sends the inbound
/// stream information (peer ID, handshake) to protocol for validation. Protocol must then verify
/// that the handshake is valid (and in the future that it has a slot it can allocate for the peer)
/// and then report back the `ValidationResult` which is either `Accept` or `Reject`.
///
/// After the validation result has been received by `Notifications`, it prepares the
/// substream for communication by initializing the necessary sinks and emits
/// `NotificationStreamOpened` which informs the protocol that the remote peer is ready to receive
/// notifications.
///
/// Two different flavors of sending options are provided:
/// * synchronous sending ([`NotificationService::send_sync_notification()`])
/// * asynchronous sending ([`NotificationService::send_async_notification()`])
///
/// The former is used by the protocols not ready to exercise backpressure and the latter by the
/// protocols that can do it.
///
/// Both local and remote peer can close the substream at any time. Local peer can do so by calling
/// [`NotificationService::close_substream()`] which instructs `Notifications` to close the
/// substream. Remote closing the substream is indicated to the local peer by receiving
/// [`NotificationEvent::NotificationStreamClosed`] event.
///
/// In case the protocol must update its handshake while it's operating (such as updating the best
/// block information), it can do so by calling [`NotificationService::set_handshake()`]
/// which instructs `Notifications` to update the handshake it stored during protocol
/// initialization.
///
/// All peer events are multiplexed on the same incoming event stream from `Notifications` and thus
/// each event carries a `PeerId` so the protocol knows whose information to update when receiving
/// an event.
#[async_trait::async_trait]
pub trait NotificationService: Debug + Send {
/// Instruct `Notifications` to open a new substream for `peer`.
///
/// `dial_if_disconnected` informs `Notifications` whether to dial
// the peer if there is currently no active connection to it.
//
// NOTE: not offered by the current implementation
async fn open_substream(&mut self, peer: PeerId) -> Result<(), ()>;
/// Instruct `Notifications` to close substream for `peer`.
//
// NOTE: not offered by the current implementation
async fn close_substream(&mut self, peer: PeerId) -> Result<(), ()>;
/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, peer: &PeerId, notification: Vec<u8>);
/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
///
/// Returns an error if the peer doesn't exist.
async fn send_async_notification(
&self,
peer: &PeerId,
notification: Vec<u8>,
) -> Result<(), error::Error>;
/// Set handshake for the notification protocol replacing the old handshake.
async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
/// Non-blocking variant of `set_handshake()` that attempts to update the handshake
/// and returns an error if the channel is blocked.
///
/// Technically the function can return an error if the channel to `Notifications` is closed
/// but that doesn't happen under normal operation.
fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
/// Get next event from the `Notifications` event stream.
async fn next_event(&mut self) -> Option<NotificationEvent>;
/// Make a copy of the object so it can be shared between protocol components
/// who wish to have access to the same underlying notification protocol.
fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()>;
/// Get protocol name of the `NotificationService`.
fn protocol(&self) -> &ProtocolName;
/// Get message sink of the peer.
fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>>;
}
/// Message sink for peers.
///
/// If protocol cannot use [`NotificationService`] to send notifications to peers and requires,
/// e.g., notifications to be sent in another task, the protocol may acquire a [`MessageSink`]
/// object for each peer by calling [`NotificationService::message_sink()`]. Calling this
/// function returns an object which allows the protocol to send notifications to the remote peer.
///
/// Use of this API is discouraged as it's not as performant as sending notifications through
/// [`NotificationService`] due to synchronization required to keep the underlying notification
/// sink up to date with possible sink replacement events.
#[async_trait::async_trait]
pub trait MessageSink: Send + Sync {
/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
fn send_sync_notification(&self, notification: Vec<u8>);
/// Send an asynchronous `notification` to to the peer associated with this [`MessageSink`],
/// allowing sender to exercise backpressure.
///
/// Returns an error if the peer does not exist.
async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error>;
}