Integrate litep2p into Polkadot SDK (#2944)

[litep2p](https://github.com/altonen/litep2p) is a libp2p-compatible P2P
networking library. It supports all of the features of `rust-libp2p`
that are currently being utilized by Polkadot SDK.

Compared to `rust-libp2p`, `litep2p` has a quite different architecture
which is why the new `litep2p` network backend is only able to use a
little of the existing code in `sc-network`. The design has been mainly
influenced by how we'd wish to structure our networking-related code in
Polkadot SDK: independent higher-levels protocols directly communicating
with the network over links that support bidirectional backpressure. A
good example would be `NotificationHandle`/`RequestResponseHandle`
abstractions which allow, e.g., `SyncingEngine` to directly communicate
with peers to announce/request blocks.

I've tried running `polkadot --network-backend litep2p` with a few
different peer configurations and there is a noticeable reduction in
networking CPU usage. For high load (`--out-peers 200`), networking CPU
usage goes down from ~110% to ~30% (80 pp) and for normal load
(`--out-peers 40`), the usage goes down from ~55% to ~18% (37 pp).

These should not be taken as final numbers because:

a) there are still some low-hanging optimization fruits, such as
enabling [receive window
auto-tuning](https://github.com/libp2p/rust-yamux/pull/176), integrating
`Peerset` more closely with `litep2p` or improving memory usage of the
WebSocket transport
b) fixing bugs/instabilities that incorrectly cause `litep2p` to do less
work will increase the networking CPU usage
c) verification in a more diverse set of tests/conditions is needed

Nevertheless, these numbers should give an early estimate for CPU usage
of the new networking backend.

This PR consists of three separate changes:
* introduce a generic `PeerId` (wrapper around `Multihash`) so that we
don't have use `NetworkService::PeerId` in every part of the code that
uses a `PeerId`
* introduce `NetworkBackend` trait, implement it for the libp2p network
stack and make Polkadot SDK generic over `NetworkBackend`
  * implement `NetworkBackend` for litep2p

The new library should be considered experimental which is why
`rust-libp2p` will remain as the default option for the time being. This
PR currently depends on the master branch of `litep2p` but I'll cut a
new release for the library once all review comments have been
addresses.

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Co-authored-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Aaro Altonen
2024-04-08 19:44:13 +03:00
committed by GitHub
parent 9543d31474
commit 80616f6d03
181 changed files with 11055 additions and 1862 deletions
@@ -25,7 +25,7 @@ pub use self::{
service::{notification_service, ProtocolHandlePair},
};
pub(crate) use self::service::{metrics, ProtocolHandle};
pub(crate) use self::service::ProtocolHandle;
mod behaviour;
mod handler;
@@ -19,10 +19,13 @@
use crate::{
protocol::notifications::{
handler::{self, NotificationsSink, NotifsHandler, NotifsHandlerIn, NotifsHandlerOut},
service::{metrics, NotificationCommand, ProtocolHandle, ValidationCallResult},
service::{NotificationCommand, ProtocolHandle, ValidationCallResult},
},
protocol_controller::{self, IncomingIndex, Message, SetId},
service::traits::{Direction, ValidationResult},
service::{
metrics::NotificationMetrics,
traits::{Direction, ValidationResult},
},
types::ProtocolName,
};
@@ -167,7 +170,7 @@ pub struct Notifications {
pending_inbound_validations: FuturesUnordered<PendingInboundValidation>,
/// Metrics for notifications.
metrics: Option<metrics::Metrics>,
metrics: NotificationMetrics,
}
/// Configuration for a notifications protocol.
@@ -404,7 +407,7 @@ impl Notifications {
pub(crate) fn new(
protocol_controller_handles: Vec<protocol_controller::ProtocolHandle>,
from_protocol_controllers: TracingUnboundedReceiver<Message>,
metrics: Option<metrics::Metrics>,
metrics: NotificationMetrics,
notif_protocols: impl Iterator<
Item = (
ProtocolConfig,
@@ -1230,7 +1233,7 @@ impl NetworkBehaviour for Notifications {
send_back_addr: remote_addr.clone(),
},
self.notif_protocols.clone(),
self.metrics.clone(),
Some(self.metrics.clone()),
))
}
@@ -1245,7 +1248,7 @@ impl NetworkBehaviour for Notifications {
peer,
ConnectedPoint::Dialer { address: addr.clone(), role_override },
self.notif_protocols.clone(),
self.metrics.clone(),
Some(self.metrics.clone()),
))
}
@@ -2442,7 +2445,7 @@ mod tests {
reserved_only: false,
},
to_notifications,
Box::new(MockPeerStore {}),
Arc::new(MockPeerStore {}),
);
let (notif_handle, command_stream) = protocol_handle_pair.split();
@@ -2450,7 +2453,7 @@ mod tests {
Notifications::new(
vec![handle],
from_controller,
None,
NotificationMetrics::new(None),
iter::once((
ProtocolConfig {
name: "/foo".into(),
@@ -58,13 +58,11 @@
//! [`NotifsHandlerIn::Open`] has gotten an answer.
use crate::{
protocol::notifications::{
service::metrics,
upgrade::{
NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
UpgradeCollec,
},
protocol::notifications::upgrade::{
NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutSubstream,
UpgradeCollec,
},
service::metrics::NotificationMetrics,
types::ProtocolName,
};
@@ -131,7 +129,7 @@ pub struct NotifsHandler {
>,
/// Metrics.
metrics: Option<Arc<metrics::Metrics>>,
metrics: Option<Arc<NotificationMetrics>>,
}
impl NotifsHandler {
@@ -140,7 +138,7 @@ impl NotifsHandler {
peer_id: PeerId,
endpoint: ConnectedPoint,
protocols: Vec<ProtocolConfig>,
metrics: Option<metrics::Metrics>,
metrics: Option<NotificationMetrics>,
) -> Self {
Self {
protocols: protocols
@@ -345,7 +343,7 @@ pub enum NotifsHandlerOut {
#[derive(Debug, Clone)]
pub struct NotificationsSink {
inner: Arc<NotificationsSinkInner>,
metrics: Option<Arc<metrics::Metrics>>,
metrics: Option<Arc<NotificationMetrics>>,
}
impl NotificationsSink {
@@ -372,7 +370,7 @@ impl NotificationsSink {
}
/// Get reference to metrics.
pub fn metrics(&self) -> &Option<Arc<metrics::Metrics>> {
pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
&self.metrics
}
}
@@ -16,115 +16,40 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::types::ProtocolName;
use prometheus_endpoint::{
self as prometheus, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry,
U64,
};
use std::sync::Arc;
/// Notification metrics.
#[derive(Debug, Clone)]
pub struct Metrics {
// Total number of opened substreams.
pub notifications_streams_opened_total: CounterVec<U64>,
/// Total number of closed substreams.
pub notifications_streams_closed_total: CounterVec<U64>,
/// In/outbound notification sizes.
pub notifications_sizes: HistogramVec,
}
impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
notifications_sizes: prometheus::register(
HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"substrate_sub_libp2p_notifications_sizes",
"Sizes of the notifications send to and received from all nodes",
),
buckets: prometheus::exponential_buckets(64.0, 4.0, 8)
.expect("parameters are always valid values; qed"),
},
&["direction", "protocol"],
)?,
registry,
)?,
notifications_streams_closed_total: prometheus::register(
CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_closed_total",
"Total number of notification substreams that have been closed",
),
&["protocol"],
)?,
registry,
)?,
notifications_streams_opened_total: prometheus::register(
CounterVec::new(
Opts::new(
"substrate_sub_libp2p_notifications_streams_opened_total",
"Total number of notification substreams that have been opened",
),
&["protocol"],
)?,
registry,
)?,
})
}
}
/// Register metrics.
pub fn register(registry: &Registry) -> Result<Metrics, PrometheusError> {
Metrics::register(registry)
}
use crate::{service::metrics::NotificationMetrics, types::ProtocolName};
/// Register opened substream to Prometheus.
pub fn register_substream_opened(metrics: &Option<Metrics>, protocol: &ProtocolName) {
pub fn register_substream_opened(metrics: &Option<NotificationMetrics>, protocol: &ProtocolName) {
if let Some(metrics) = metrics {
metrics.notifications_streams_opened_total.with_label_values(&[&protocol]).inc();
metrics.register_substream_opened(&protocol);
}
}
/// Register closed substream to Prometheus.
pub fn register_substream_closed(metrics: &Option<Metrics>, protocol: &ProtocolName) {
pub fn register_substream_closed(metrics: &Option<NotificationMetrics>, protocol: &ProtocolName) {
if let Some(metrics) = metrics {
metrics
.notifications_streams_closed_total
.with_label_values(&[&protocol[..]])
.inc();
metrics.register_substream_closed(&protocol);
}
}
/// Register sent notification to Prometheus.
pub fn register_notification_sent(
metrics: &Option<Arc<Metrics>>,
metrics: &Option<std::sync::Arc<NotificationMetrics>>,
protocol: &ProtocolName,
size: usize,
) {
if let Some(metrics) = metrics {
metrics
.notifications_sizes
.with_label_values(&["out", protocol])
.observe(size as f64);
metrics.register_notification_sent(protocol, size);
}
}
/// Register received notification to Prometheus.
pub fn register_notification_received(
metrics: &Option<Metrics>,
metrics: &Option<NotificationMetrics>,
protocol: &ProtocolName,
size: usize,
) {
if let Some(metrics) = metrics {
metrics
.notifications_sizes
.with_label_values(&["in", protocol])
.observe(size as f64);
metrics.register_notification_received(protocol, size);
}
}
@@ -21,17 +21,20 @@
use crate::{
error,
protocol::notifications::handler::NotificationsSink,
service::traits::{
Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
service::{
metrics::NotificationMetrics,
traits::{
Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
},
},
types::ProtocolName,
PeerId,
};
use futures::{
stream::{FuturesUnordered, Stream},
StreamExt,
};
use libp2p::PeerId;
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
@@ -66,7 +69,7 @@ impl MessageSink for NotificationSink {
fn send_sync_notification(&self, notification: Vec<u8>) {
let sink = self.lock();
metrics::register_notification_sent(&sink.0.metrics(), &sink.1, notification.len());
metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
sink.0.send_sync_notification(notification);
}
@@ -87,7 +90,7 @@ impl MessageSink for NotificationSink {
.map_err(|_| error::Error::ConnectionClosed)?;
permit.send(notification).map_err(|_| error::Error::ChannelClosed).map(|res| {
metrics::register_notification_sent(&sink.0.metrics(), &sink.1, notification_len);
metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
res
})
}
@@ -220,20 +223,20 @@ impl NotificationHandle {
#[async_trait::async_trait]
impl NotificationService for NotificationHandle {
/// Instruct `Notifications` to open a new substream for `peer`.
async fn open_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
async fn open_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
todo!("support for opening substreams not implemented yet");
}
/// Instruct `Notifications` to close substream for `peer`.
async fn close_substream(&mut self, _peer: PeerId) -> Result<(), ()> {
async fn close_substream(&mut self, _peer: sc_network_types::PeerId) -> Result<(), ()> {
todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
}
/// Send synchronous `notification` to `peer`.
fn send_sync_notification(&self, peer: &PeerId, notification: Vec<u8>) {
if let Some(info) = self.peers.get(&peer) {
fn send_sync_notification(&mut self, peer: &sc_network_types::PeerId, notification: Vec<u8>) {
if let Some(info) = self.peers.get(&((*peer).into())) {
metrics::register_notification_sent(
&info.sink.metrics(),
info.sink.metrics(),
&self.protocol,
notification.len(),
);
@@ -244,12 +247,16 @@ impl NotificationService for NotificationHandle {
/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
async fn send_async_notification(
&self,
peer: &PeerId,
&mut self,
peer: &sc_network_types::PeerId,
notification: Vec<u8>,
) -> Result<(), error::Error> {
let notification_len = notification.len();
let sink = &self.peers.get(&peer).ok_or_else(|| error::Error::PeerDoesntExist(*peer))?.sink;
let sink = &self
.peers
.get(&peer.into())
.ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
.sink;
sink.reserve_notification()
.await
@@ -258,7 +265,7 @@ impl NotificationService for NotificationHandle {
.map_err(|_| error::Error::ChannelClosed)
.map(|res| {
metrics::register_notification_sent(
&sink.metrics(),
sink.metrics(),
&self.protocol,
notification_len,
);
@@ -288,7 +295,7 @@ impl NotificationService for NotificationHandle {
match self.rx.next().await? {
InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } =>
return Some(NotificationEvent::ValidateInboundSubstream {
peer,
peer: peer.into(),
handshake,
result_tx,
}),
@@ -307,7 +314,7 @@ impl NotificationService for NotificationHandle {
},
);
return Some(NotificationEvent::NotificationStreamOpened {
peer,
peer: peer.into(),
handshake,
direction,
negotiated_fallback,
@@ -315,10 +322,13 @@ impl NotificationService for NotificationHandle {
},
InnerNotificationEvent::NotificationStreamClosed { peer } => {
self.peers.remove(&peer);
return Some(NotificationEvent::NotificationStreamClosed { peer })
return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() })
},
InnerNotificationEvent::NotificationReceived { peer, notification } =>
return Some(NotificationEvent::NotificationReceived { peer, notification }),
return Some(NotificationEvent::NotificationReceived {
peer: peer.into(),
notification,
}),
InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
match self.peers.get_mut(&peer) {
None => log::error!(
@@ -357,8 +367,8 @@ impl NotificationService for NotificationHandle {
}
/// Get message sink of the peer.
fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>> {
match self.peers.get(peer) {
fn message_sink(&self, peer: &sc_network_types::PeerId) -> Option<Box<dyn MessageSink>> {
match self.peers.get(&peer.into()) {
Some(context) => Some(Box::new(context.shared_sink.clone())),
None => None,
}
@@ -417,7 +427,7 @@ pub(crate) struct ProtocolHandle {
delegate_to_peerset: bool,
/// Prometheus metrics.
metrics: Option<metrics::Metrics>,
metrics: Option<NotificationMetrics>,
}
pub(crate) enum ValidationCallResult {
@@ -432,8 +442,8 @@ impl ProtocolHandle {
}
/// Set metrics.
pub fn set_metrics(&mut self, metrics: Option<metrics::Metrics>) {
self.metrics = metrics;
pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
self.metrics = Some(metrics);
}
/// Delegate validation to `Peerset`.
@@ -38,7 +38,7 @@ async fn validate_and_accept_substream() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -66,7 +66,7 @@ async fn substream_opened() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -92,7 +92,7 @@ async fn send_sync_notification() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -112,7 +112,7 @@ async fn send_sync_notification() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -120,7 +120,7 @@ async fn send_sync_notification() {
panic!("invalid event received");
}
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 8] })
@@ -144,7 +144,7 @@ async fn send_async_notification() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -164,7 +164,7 @@ async fn send_async_notification() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -172,7 +172,7 @@ async fn send_async_notification() {
panic!("invalid event received");
}
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
async_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, 9] })
@@ -181,24 +181,24 @@ async fn send_async_notification() {
#[tokio::test]
async fn send_sync_notification_to_non_existent_peer() {
let (proto, notif) = notification_service("/proto/1".into());
let (proto, mut notif) = notification_service("/proto/1".into());
let (_sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (_handle, _stream) = proto.split();
let peer = PeerId::random();
// as per the original implementation, the call doesn't fail
notif.send_sync_notification(&peer, vec![1, 3, 3, 7])
notif.send_sync_notification(&peer.into(), vec![1, 3, 3, 7])
}
#[tokio::test]
async fn send_async_notification_to_non_existent_peer() {
let (proto, notif) = notification_service("/proto/1".into());
let (proto, mut notif) = notification_service("/proto/1".into());
let (_sink, _, _sync_rx) = NotificationsSink::new(PeerId::random());
let (_handle, _stream) = proto.split();
let peer = PeerId::random();
if let Err(error::Error::PeerDoesntExist(peer_id)) =
notif.send_async_notification(&peer, vec![1, 3, 3, 7]).await
notif.send_async_notification(&peer.into(), vec![1, 3, 3, 7]).await
{
assert_eq!(peer, peer_id);
} else {
@@ -223,7 +223,7 @@ async fn receive_notification() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -243,7 +243,7 @@ async fn receive_notification() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -257,7 +257,7 @@ async fn receive_notification() {
if let Some(NotificationEvent::NotificationReceived { peer, notification }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(notification, vec![1, 3, 3, 8]);
} else {
panic!("invalid event received");
@@ -281,7 +281,7 @@ async fn backpressure_works() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -301,7 +301,7 @@ async fn backpressure_works() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -311,12 +311,15 @@ async fn backpressure_works() {
// fill the message buffer with messages
for i in 0..=ASYNC_NOTIFICATIONS_BUFFER_SIZE {
assert!(futures::poll!(notif.send_async_notification(&peer_id, vec![1, 3, 3, i as u8]))
.is_ready());
assert!(futures::poll!(
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, i as u8])
)
.is_ready());
}
// try to send one more message and verify that the call blocks
assert!(futures::poll!(notif.send_async_notification(&peer_id, vec![1, 3, 3, 9])).is_pending());
assert!(futures::poll!(notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]))
.is_pending());
// release one slot from the buffer for new message
assert_eq!(
@@ -325,7 +328,9 @@ async fn backpressure_works() {
);
// verify that a message can be sent
assert!(futures::poll!(notif.send_async_notification(&peer_id, vec![1, 3, 3, 9])).is_ready());
assert!(
futures::poll!(notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9])).is_ready()
);
}
#[tokio::test]
@@ -345,7 +350,7 @@ async fn peer_disconnects_then_sync_notification_is_sent() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -365,7 +370,7 @@ async fn peer_disconnects_then_sync_notification_is_sent() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -379,7 +384,7 @@ async fn peer_disconnects_then_sync_notification_is_sent() {
drop(sync_rx);
// as per documentation, error is not reported but the notification is silently dropped
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 7]);
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 7]);
}
#[tokio::test]
@@ -399,7 +404,7 @@ async fn peer_disconnects_then_async_notification_is_sent() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -419,7 +424,7 @@ async fn peer_disconnects_then_async_notification_is_sent() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -434,7 +439,7 @@ async fn peer_disconnects_then_async_notification_is_sent() {
// as per documentation, error is not reported but the notification is silently dropped
if let Err(error::Error::ConnectionClosed) =
notif.send_async_notification(&peer_id, vec![1, 3, 3, 7]).await
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 7]).await
{
} else {
panic!("invalid state after calling `send_async_notification()` on closed connection")
@@ -460,7 +465,7 @@ async fn cloned_service_opening_substream_works() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif1.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -475,7 +480,7 @@ async fn cloned_service_opening_substream_works() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif2.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -505,7 +510,7 @@ async fn cloned_service_one_service_rejects_substream() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -519,7 +524,7 @@ async fn cloned_service_one_service_rejects_substream() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif3.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Reject).unwrap();
} else {
@@ -549,7 +554,7 @@ async fn cloned_service_opening_substream_sending_and_receiving_notifications_wo
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -571,7 +576,7 @@ async fn cloned_service_opening_substream_sending_and_receiving_notifications_wo
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -586,16 +591,16 @@ async fn cloned_service_opening_substream_sending_and_receiving_notifications_wo
if let Some(NotificationEvent::NotificationReceived { peer, notification }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(notification, vec![1, 3, 3, 8]);
} else {
panic!("invalid event received");
}
}
for (i, notif) in vec![&mut notif1, &mut notif2, &mut notif3].iter().enumerate() {
for (i, notif) in vec![&mut notif1, &mut notif2, &mut notif3].iter_mut().enumerate() {
// send notification from each service and verify peer receives it
notif.send_sync_notification(&peer_id, vec![1, 3, 3, i as u8]);
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, i as u8]);
assert_eq!(
sync_rx.next().await,
Some(NotificationsSinkMessage::Notification { message: vec![1, 3, 3, i as u8] })
@@ -608,7 +613,7 @@ async fn cloned_service_opening_substream_sending_and_receiving_notifications_wo
for notif in vec![&mut notif1, &mut notif2, &mut notif3] {
if let Some(NotificationEvent::NotificationStreamClosed { peer }) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
} else {
panic!("invalid event received");
}
@@ -632,7 +637,7 @@ async fn sending_notifications_using_notifications_sink_works() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -652,7 +657,7 @@ async fn sending_notifications_using_notifications_sink_works() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -661,7 +666,7 @@ async fn sending_notifications_using_notifications_sink_works() {
}
// get a copy of the notification sink and send a synchronous notification using.
let sink = notif.message_sink(&peer_id).unwrap();
let sink = notif.message_sink(&peer_id.into()).unwrap();
sink.send_sync_notification(vec![1, 3, 3, 6]);
// send an asynchronous notification using the acquired notifications sink.
@@ -677,8 +682,8 @@ async fn sending_notifications_using_notifications_sink_works() {
);
// send notifications using the stored notification sink as well.
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
sync_rx.next().await,
@@ -693,7 +698,7 @@ async fn sending_notifications_using_notifications_sink_works() {
#[test]
fn try_to_get_notifications_sink_for_non_existent_peer() {
let (_proto, notif) = notification_service("/proto/1".into());
assert!(notif.message_sink(&PeerId::random()).is_none());
assert!(notif.message_sink(&sc_network_types::PeerId::random()).is_none());
}
#[tokio::test]
@@ -713,7 +718,7 @@ async fn notification_sink_replaced() {
if let Some(NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx }) =
notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(handshake, vec![1, 3, 3, 7]);
let _ = result_tx.send(ValidationResult::Accept).unwrap();
} else {
@@ -733,7 +738,7 @@ async fn notification_sink_replaced() {
direction,
}) = notif.next_event().await
{
assert_eq!(peer_id, peer);
assert_eq!(peer_id, peer.into());
assert_eq!(negotiated_fallback, None);
assert_eq!(handshake, vec![1, 3, 3, 7]);
assert_eq!(direction, Direction::Inbound);
@@ -742,7 +747,7 @@ async fn notification_sink_replaced() {
}
// get a copy of the notification sink and send a synchronous notification using.
let sink = notif.message_sink(&peer_id).unwrap();
let sink = notif.message_sink(&peer_id.into()).unwrap();
sink.send_sync_notification(vec![1, 3, 3, 6]);
// send an asynchronous notification using the acquired notifications sink.
@@ -758,8 +763,8 @@ async fn notification_sink_replaced() {
);
// send notifications using the stored notification sink as well.
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
sync_rx.next().await,
@@ -788,8 +793,8 @@ async fn notification_sink_replaced() {
// verify that using the `NotificationService` API automatically results in using the correct
// sink
notif.send_sync_notification(&peer_id, vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id, vec![1, 3, 3, 9]).await.unwrap();
notif.send_sync_notification(&peer_id.into(), vec![1, 3, 3, 8]);
notif.send_async_notification(&peer_id.into(), vec![1, 3, 3, 9]).await.unwrap();
assert_eq!(
new_sync_rx.next().await,
@@ -22,7 +22,10 @@ use crate::{
peer_store::PeerStore,
protocol::notifications::{Notifications, NotificationsOut, ProtocolConfig},
protocol_controller::{ProtoSetConfig, ProtocolController, SetId},
service::traits::{NotificationEvent, ValidationResult},
service::{
metrics::NotificationMetrics,
traits::{NotificationEvent, ValidationResult},
},
};
use futures::{future::BoxFuture, prelude::*};
@@ -40,6 +43,7 @@ use sc_utils::mpsc::tracing_unbounded;
use std::{
iter,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
@@ -91,7 +95,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
reserved_only: false,
},
to_notifications,
Box::new(peer_store.handle()),
Arc::new(peer_store.handle()),
);
let (notif_handle, command_stream) = protocol_handle_pair.split();
@@ -99,7 +103,7 @@ fn build_nodes() -> (Swarm<CustomProtoWithAddr>, Swarm<CustomProtoWithAddr>) {
inner: Notifications::new(
vec![controller_handle],
from_controller,
None,
NotificationMetrics::new(None),
iter::once((
ProtocolConfig {
name: "/foo".into(),