sc-network: Log outgoing notifications too (#7624)

* Log outgoing notifications too

* Update client/network/src/protocol/generic_proto/handler.rs

Co-authored-by: Max Inden <mail@max-inden.de>

Co-authored-by: Addie Wagenknecht <addie@nortd.com>
Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
Pierre Krieger
2020-11-30 11:52:08 +01:00
committed by GitHub
parent f6dc35284f
commit a2b14d10fb
3 changed files with 52 additions and 6 deletions
@@ -650,13 +650,17 @@ impl GenericProto {
Some(sink) => sink Some(sink) => sink
}; };
let message = message.into();
trace!( trace!(
target: "sub-libp2p", target: "sub-libp2p",
"External API => Notification({:?}, {:?})", "External API => Notification({:?}, {:?}, {} bytes)",
target, target,
protocol_name, protocol_name,
message.len(),
); );
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);
notifs_sink.send_sync_notification( notifs_sink.send_sync_notification(
protocol_name, protocol_name,
message message
@@ -1930,9 +1934,10 @@ impl NetworkBehaviour for GenericProto {
if self.is_open(&source) { if self.is_open(&source) {
trace!( trace!(
target: "sub-libp2p", target: "sub-libp2p",
"Handler({:?}) => Notification({:?})", "Handler({:?}) => Notification({:?}, {} bytes)",
source, source,
protocol_name, protocol_name,
message.len()
); );
trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source); trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source);
let event = GenericProtoOut::Notification { let event = GenericProtoOut::Notification {
@@ -1945,9 +1950,10 @@ impl NetworkBehaviour for GenericProto {
} else { } else {
trace!( trace!(
target: "sub-libp2p", target: "sub-libp2p",
"Handler({:?}) => Post-close notification({:?})", "Handler({:?}) => Post-close notification({:?}, {} bytes)",
source, source,
protocol_name, protocol_name,
message.len()
); );
} }
} }
@@ -138,6 +138,9 @@ pub struct NotifsHandler {
/// Whether we are the connection dialer or listener. /// Whether we are the connection dialer or listener.
endpoint: ConnectedPoint, endpoint: ConnectedPoint,
/// Remote we are connected to.
peer_id: PeerId,
/// State of this handler. /// State of this handler.
state: State, state: State,
@@ -260,12 +263,13 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
SelectUpgrade::new(in_protocols, self.legacy_protocol.clone()) SelectUpgrade::new(in_protocols, self.legacy_protocol.clone())
} }
fn into_handler(self, _: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
let num_out_proto = self.out_protocols.len(); let num_out_proto = self.out_protocols.len();
NotifsHandler { NotifsHandler {
in_protocols: self.in_protocols, in_protocols: self.in_protocols,
out_protocols: self.out_protocols, out_protocols: self.out_protocols,
peer_id: peer_id.clone(),
endpoint: connected_point.clone(), endpoint: connected_point.clone(),
when_connection_open: Instant::now(), when_connection_open: Instant::now(),
state: State::Closed { state: State::Closed {
@@ -365,6 +369,8 @@ pub struct NotificationsSink {
#[derive(Debug)] #[derive(Debug)]
struct NotificationsSinkInner { struct NotificationsSinkInner {
/// Target of the sink.
peer_id: PeerId,
/// Sender to use in asynchronous contexts. Uses an asynchronous mutex. /// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>, async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
/// Sender to use in synchronous contexts. Uses a synchronous mutex. /// Sender to use in synchronous contexts. Uses a synchronous mutex.
@@ -390,6 +396,11 @@ enum NotificationsSinkMessage {
} }
impl NotificationsSink { impl NotificationsSink {
/// Returns the [`PeerId`] the sink is connected to.
pub fn peer_id(&self) -> &PeerId {
&self.inner.peer_id
}
/// Sends a notification to the peer. /// Sends a notification to the peer.
/// ///
/// If too many messages are already buffered, the notification is silently discarded and the /// If too many messages are already buffered, the notification is silently discarded and the
@@ -447,6 +458,12 @@ pub struct Ready<'a> {
} }
impl<'a> Ready<'a> { impl<'a> Ready<'a> {
/// Returns the name of the protocol. Matches the one passed to
/// [`NotificationsSink::reserve_notification`].
pub fn protocol_name(&self) -> &Cow<'static, str> {
&self.protocol_name
}
/// Consumes this slots reservation and actually queues the notification. /// Consumes this slots reservation and actually queues the notification.
/// ///
/// Returns an error if the substream has been closed. /// Returns an error if the substream has been closed.
@@ -622,6 +639,7 @@ impl ProtocolsHandler for NotifsHandler {
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink { let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner { inner: Arc::new(NotificationsSinkInner {
peer_id: self.peer_id.clone(),
async_channel: FuturesMutex::new(async_tx), async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx), sync_channel: Mutex::new(sync_tx),
}), }),
@@ -782,6 +800,7 @@ impl ProtocolsHandler for NotifsHandler {
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink { let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner { inner: Arc::new(NotificationsSinkInner {
peer_id: self.peer_id.clone(),
async_channel: FuturesMutex::new(async_tx), async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx), sync_channel: Mutex::new(sync_tx),
}), }),
+22 -1
View File
@@ -664,7 +664,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
// `peers_notifications_sinks` mutex as soon as possible. // `peers_notifications_sinks` mutex as soon as possible.
let sink = { let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock(); let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) { if let Some(sink) = peers_notifications_sinks.get(&(target.clone(), protocol.clone())) {
sink.clone() sink.clone()
} else { } else {
// Notification silently discarded, as documented. // Notification silently discarded, as documented.
@@ -684,6 +684,14 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
} }
// Sending is communicated to the `NotificationsSink`. // Sending is communicated to the `NotificationsSink`.
trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?}, {} bytes)",
target,
protocol,
message.len()
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);
sink.send_sync_notification(protocol, message); sink.send_sync_notification(protocol, message);
} }
@@ -1139,6 +1147,7 @@ impl NotificationSender {
Ok(r) => r, Ok(r) => r,
Err(()) => return Err(NotificationSenderError::Closed), Err(()) => return Err(NotificationSenderError::Closed),
}, },
peer_id: self.sink.peer_id(),
notification_size_metric: self.notification_size_metric.clone(), notification_size_metric: self.notification_size_metric.clone(),
}) })
} }
@@ -1149,6 +1158,9 @@ impl NotificationSender {
pub struct NotificationSenderReady<'a> { pub struct NotificationSenderReady<'a> {
ready: Ready<'a>, ready: Ready<'a>,
/// Target of the notification.
peer_id: &'a PeerId,
/// Field extracted from the [`Metrics`] struct and necessary to report the /// Field extracted from the [`Metrics`] struct and necessary to report the
/// notifications-related metrics. /// notifications-related metrics.
notification_size_metric: Option<Histogram>, notification_size_metric: Option<Histogram>,
@@ -1163,6 +1175,15 @@ impl<'a> NotificationSenderReady<'a> {
notification_size_metric.observe(notification.len() as f64); notification_size_metric.observe(notification.len() as f64);
} }
trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?}, {} bytes)",
self.peer_id,
self.ready.protocol_name(),
notification.len()
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id);
self.ready self.ready
.send(notification) .send(notification)
.map_err(|()| NotificationSenderError::Closed) .map_err(|()| NotificationSenderError::Closed)