Remove necessity to pass ConsensusEngineId when registering notifications protocol (#7549)

* Remove necessity to pass ConsensusEngineId when registering notifications protocol

* Line width

* Fix tests protocol name

* Other renames

* Doc update

* Change issue in TODO
This commit is contained in:
Pierre Krieger
2020-11-18 16:05:35 +01:00
committed by GitHub
parent 22a02d3e7a
commit 1eae9f5792
18 changed files with 228 additions and 284 deletions
+16 -15
View File
@@ -30,7 +30,7 @@ use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use log::debug;
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, Justification};
use std::{
borrow::Cow,
collections::{HashSet, VecDeque},
@@ -131,7 +131,7 @@ pub enum BehaviourOut<B: BlockT> {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
/// Role of the remote.
@@ -147,7 +147,7 @@ pub enum BehaviourOut<B: BlockT> {
/// Id of the peer we are connected to.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,
/// Replacement for the previous [`NotificationsSink`].
notifications_sink: NotificationsSink,
},
@@ -158,7 +158,7 @@ pub enum BehaviourOut<B: BlockT> {
/// Node we closed the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,
},
/// Received one or more messages from the given node using the given protocol.
@@ -166,7 +166,7 @@ pub enum BehaviourOut<B: BlockT> {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<(ConsensusEngineId, Bytes)>,
messages: Vec<(Cow<'static, str>, Bytes)>,
},
/// Events generated by a DHT as a response to get_value or put_value requests as well as the
@@ -257,19 +257,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
/// will retain the protocols that were registered then, and not any new one.
pub fn register_notifications_protocol(
&mut self,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, str>>,
protocol: impl Into<Cow<'static, str>>,
) {
let protocol = protocol.into();
// This is the message that we will send to the remote as part of the initial handshake.
// At the moment, we force this to be an encoded `Roles`.
let handshake_message = Roles::from(&self.role).encode();
let list = self.substrate.register_notifications_protocol(engine_id, protocol_name, handshake_message);
let list = self.substrate.register_notifications_protocol(protocol.clone(), handshake_message);
for (remote, roles, notifications_sink) in list {
let role = reported_roles_to_observed_role(&self.role, remote, roles);
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
protocol: protocol.clone(),
role,
notifications_sink: notifications_sink.clone(),
});
@@ -363,28 +364,28 @@ Behaviour<B, H> {
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles, notifications_sink } => {
let role = reported_roles_to_observed_role(&self.role, &remote, roles);
for engine_id in protocols {
for protocol in protocols {
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
protocol,
role: role.clone(),
notifications_sink: notifications_sink.clone(),
});
}
},
CustomMessageOutcome::NotificationStreamReplaced { remote, protocols, notifications_sink } =>
for engine_id in protocols {
for protocol in protocols {
self.events.push_back(BehaviourOut::NotificationStreamReplaced {
remote: remote.clone(),
engine_id,
protocol,
notifications_sink: notifications_sink.clone(),
});
},
CustomMessageOutcome::NotificationStreamClosed { remote, protocols } =>
for engine_id in protocols {
for protocol in protocols {
self.events.push_back(BehaviourOut::NotificationStreamClosed {
remote: remote.clone(),
engine_id,
protocol,
});
},
CustomMessageOutcome::NotificationsReceived { remote, messages } => {
+3 -4
View File
@@ -41,7 +41,7 @@ use libp2p::{
};
use prometheus_endpoint::Registry;
use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use sp_runtime::traits::Block as BlockT;
use std::{borrow::Cow, convert::TryFrom, future::Future, pin::Pin, str::FromStr};
use std::{
collections::HashMap,
@@ -400,9 +400,8 @@ pub struct NetworkConfiguration {
pub boot_nodes: Vec<MultiaddrWithPeerId>,
/// The node key configuration, which determines the node's network identity keypair.
pub node_key: NodeKeyConfig,
/// List of notifications protocols that the node supports. Must also include a
/// `ConsensusEngineId` for backwards-compatibility.
pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, str>)>,
/// List of names of notifications protocols that the node supports.
pub notifications_protocols: Vec<Cow<'static, str>>,
/// List of request-response protocols that the node supports.
pub request_response_protocols: Vec<RequestResponseConfig>,
/// Maximum allowed number of incoming connections.
+5 -4
View File
@@ -53,8 +53,9 @@ use async_std::sync::{Mutex, MutexGuard};
use futures::prelude::*;
use futures::channel::mpsc::{channel, Receiver, Sender};
use libp2p::PeerId;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use sp_runtime::traits::Block as BlockT;
use std::{
borrow::Cow,
collections::VecDeque,
fmt,
sync::Arc,
@@ -82,7 +83,7 @@ impl<M> QueuedSender<M> {
pub fn new<B, H, F>(
service: Arc<NetworkService<B, H>>,
peer_id: PeerId,
protocol: ConsensusEngineId,
protocol: Cow<'static, str>,
queue_size_limit: usize,
messages_encode: F
) -> (Self, impl Future<Output = ()> + Send + 'static)
@@ -193,7 +194,7 @@ async fn create_background_future<B: BlockT, H: ExHashT, M, F: Fn(M) -> Vec<u8>>
mut wait_for_sender: Receiver<()>,
service: Arc<NetworkService<B, H>>,
peer_id: PeerId,
protocol: ConsensusEngineId,
protocol: Cow<'static, str>,
shared_message_queue: SharedMessageQueue<M>,
messages_encode: F,
) {
@@ -212,7 +213,7 @@ async fn create_background_future<B: BlockT, H: ExHashT, M, F: Fn(M) -> Vec<u8>>
// Starting from below, we try to send the message. If an error happens when sending,
// the only sane option we have is to silently discard the message.
let sender = match service.notification_sender(peer_id.clone(), protocol) {
let sender = match service.notification_sender(peer_id.clone(), protocol.clone()) {
Ok(s) => s,
Err(_) => continue,
};
+7 -7
View File
@@ -20,7 +20,7 @@ use crate::{config, gossip::QueuedSender, Event, NetworkService, NetworkWorker};
use futures::prelude::*;
use sp_runtime::traits::{Block as BlockT, Header as _};
use std::{sync::Arc, time::Duration};
use std::{borrow::Cow, sync::Arc, time::Duration};
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _};
type TestNetworkService = NetworkService<
@@ -120,24 +120,24 @@ fn build_test_full_node(config: config::NetworkConfiguration)
(service, event_stream)
}
const ENGINE_ID: sp_runtime::ConsensusEngineId = *b"foo\0";
const PROTOCOL_NAME: Cow<'static, str> = Cow::Borrowed("/foo");
/// Builds two nodes and their associated events stream.
/// The nodes are connected together and have the `ENGINE_ID` protocol registered.
/// The nodes are connected together and have the `PROTOCOL_NAME` protocol registered.
fn build_nodes_one_proto()
-> (Arc<TestNetworkService>, impl Stream<Item = Event>, Arc<TestNetworkService>, impl Stream<Item = Event>)
{
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
notifications_protocols: vec![PROTOCOL_NAME],
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
.. config::NetworkConfiguration::new_local()
});
let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
notifications_protocols: vec![PROTOCOL_NAME],
listen_addresses: vec![],
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr,
@@ -165,7 +165,7 @@ fn basic_works() {
Event::NotificationStreamClosed { .. } => panic!(),
Event::NotificationsReceived { messages, .. } => {
for message in messages {
assert_eq!(message.0, ENGINE_ID);
assert_eq!(message.0, PROTOCOL_NAME);
assert_eq!(message.1, &b"message"[..]);
received_notifications += 1;
}
@@ -181,7 +181,7 @@ fn basic_works() {
async_std::task::block_on(async move {
let (mut sender, bg_future) =
QueuedSender::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg);
QueuedSender::new(node1, node2_id, PROTOCOL_NAME, NUM_NOTIFS, |msg| msg);
async_std::task::spawn(bg_future);
// Wait for the `NotificationStreamOpened`.
+27 -55
View File
@@ -37,7 +37,7 @@ use sp_consensus::{
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, DecodeAll, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::{generic::BlockId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
};
@@ -231,8 +231,8 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
transaction_pool: Arc<dyn TransactionPool<H, B>>,
/// Handles opening the unique substream and sending and receiving raw messages.
behaviour: GenericProto,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, str>>,
/// List of notifications protocols that have been registered.
notification_protocols: Vec<Cow<'static, str>>,
/// For each protocol name, the legacy equivalent.
legacy_equiv_by_name: HashMap<Cow<'static, str>, Fallback>,
/// Name of the protocol used for transactions.
@@ -252,6 +252,7 @@ struct PacketStats {
count_in: u64,
count_out: u64,
}
/// Peer information
#[derive(Debug, Clone)]
struct Peer<B: BlockT, H: ExHashT> {
@@ -349,8 +350,8 @@ fn build_status_message<B: BlockT>(protocol_config: &ProtocolConfig, chain: &Arc
/// Fallback mechanism to use to send a notification if no substream is open.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Fallback {
/// Use a `Message::Consensus` with the given engine ID.
Consensus(ConsensusEngineId),
/// Formerly-known as `Consensus` messages. Now regular notifications.
Consensus,
/// The message is the bytes encoding of a `Transactions<E>` (which is itself defined as a `Vec<E>`).
Transactions,
/// The message is the bytes encoding of a `BlockAnnounce<H>`.
@@ -446,7 +447,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
transaction_pool,
peerset_handle: peerset_handle.clone(),
behaviour,
protocol_name_by_engine: HashMap::new(),
notification_protocols: Vec::new(),
legacy_equiv_by_name,
transactions_protocol,
block_announces_protocol,
@@ -621,7 +622,9 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
GenericMessage::RemoteCallRequest(_) |
GenericMessage::RemoteReadRequest(_) |
GenericMessage::RemoteHeaderRequest(_) |
GenericMessage::RemoteChangesRequest(_) => {
GenericMessage::RemoteChangesRequest(_) |
GenericMessage::Consensus(_) |
GenericMessage::ConsensusBatch(_) => {
debug!(
target: "sub-libp2p",
"Received no longer supported legacy request from {:?}",
@@ -630,38 +633,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
self.disconnect_peer(&who);
self.peerset_handle.report_peer(who, rep::BAD_PROTOCOL);
},
GenericMessage::Consensus(msg) =>
return if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
CustomMessageOutcome::NotificationsReceived {
remote: who,
messages: vec![(msg.engine_id, From::from(msg.data))],
}
} else {
debug!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
CustomMessageOutcome::None
},
GenericMessage::ConsensusBatch(messages) => {
let messages = messages
.into_iter()
.filter_map(|msg| {
if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
Some((msg.engine_id, From::from(msg.data)))
} else {
debug!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
None
}
})
.collect::<Vec<_>>();
return if !messages.is_empty() {
CustomMessageOutcome::NotificationsReceived {
remote: who,
messages,
}
} else {
CustomMessageOutcome::None
};
},
}
CustomMessageOutcome::None
@@ -685,7 +656,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
// Notify all the notification protocols as closed.
CustomMessageOutcome::NotificationStreamClosed {
remote: peer,
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
protocols: self.notification_protocols.clone(),
}
} else {
CustomMessageOutcome::None
@@ -939,7 +910,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
// Notify all the notification protocols as open.
CustomMessageOutcome::NotificationStreamOpened {
remote: who,
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
protocols: self.notification_protocols.clone(),
roles: info.roles,
notifications_sink,
}
@@ -952,16 +923,17 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
/// returns a list of substreams to open as a result.
pub fn register_notifications_protocol<'a>(
&'a mut self,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, str>>,
protocol: impl Into<Cow<'static, str>>,
handshake_message: Vec<u8>,
) -> impl Iterator<Item = (&'a PeerId, Roles, &'a NotificationsSink)> + 'a {
let protocol_name = protocol_name.into();
if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
let protocol = protocol.into();
if self.notification_protocols.iter().any(|p| *p == protocol) {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol);
} else {
self.behaviour.register_notif_protocol(protocol_name.clone(), handshake_message);
self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id));
self.notification_protocols.push(protocol.clone());
self.behaviour.register_notif_protocol(protocol.clone(), handshake_message);
self.legacy_equiv_by_name.insert(protocol, Fallback::Consensus);
}
let behaviour = &self.behaviour;
@@ -1450,20 +1422,20 @@ pub enum CustomMessageOutcome<B: BlockT> {
/// Notification protocols have been opened with a remote.
NotificationStreamOpened {
remote: PeerId,
protocols: Vec<ConsensusEngineId>,
protocols: Vec<Cow<'static, str>>,
roles: Roles,
notifications_sink: NotificationsSink
},
/// The [`NotificationsSink`] of some notification protocols need an update.
NotificationStreamReplaced {
remote: PeerId,
protocols: Vec<ConsensusEngineId>,
protocols: Vec<Cow<'static, str>>,
notifications_sink: NotificationsSink,
},
/// Notification protocols have been closed with a remote.
NotificationStreamClosed { remote: PeerId, protocols: Vec<ConsensusEngineId> },
NotificationStreamClosed { remote: PeerId, protocols: Vec<Cow<'static, str>> },
/// Messages have been received on one or more notifications protocols.
NotificationsReceived { remote: PeerId, messages: Vec<(ConsensusEngineId, Bytes)> },
NotificationsReceived { remote: PeerId, messages: Vec<(Cow<'static, str>, Bytes)> },
/// A new block request must be emitted.
/// You must later call either [`Protocol::on_block_response`] or
/// [`Protocol::on_block_request_failed`].
@@ -1664,7 +1636,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
GenericProtoOut::CustomProtocolReplaced { peer_id, notifications_sink, .. } => {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
protocols: self.notification_protocols.clone(),
notifications_sink,
}
},
@@ -1675,10 +1647,10 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
self.on_custom_message(peer_id, message),
GenericProtoOut::Notification { peer_id, protocol_name, message } =>
match self.legacy_equiv_by_name.get(&protocol_name) {
Some(Fallback::Consensus(engine_id)) => {
Some(Fallback::Consensus) => {
CustomMessageOutcome::NotificationsReceived {
remote: peer_id,
messages: vec![(*engine_id, message.freeze())],
messages: vec![(protocol_name, message.freeze())],
}
}
Some(Fallback::Transactions) => {
@@ -20,7 +20,7 @@
use bytes::Bytes;
use libp2p::core::PeerId;
use libp2p::kad::record::Key;
use sp_runtime::ConsensusEngineId;
use std::borrow::Cow;
/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
@@ -53,7 +53,7 @@ pub enum Event {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,
/// Role of the remote.
role: ObservedRole,
},
@@ -64,7 +64,7 @@ pub enum Event {
/// Node we closed the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,
},
/// Received one or more messages from the given node using the given protocol.
@@ -72,7 +72,7 @@ pub enum Event {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<(ConsensusEngineId, Bytes)>,
messages: Vec<(Cow<'static, str>, Bytes)>,
},
}
@@ -216,7 +216,7 @@ pub mod generic {
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct ConsensusMessage {
/// Identifies consensus engine.
pub engine_id: ConsensusEngineId,
pub protocol: ConsensusEngineId,
/// Message payload.
pub data: Vec<u8>,
}
+46 -81
View File
@@ -53,10 +53,7 @@ use metrics::{Metrics, MetricSources, Histogram, HistogramVec};
use parking_lot::Mutex;
use sc_peerset::PeersetHandle;
use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link};
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
ConsensusEngineId,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{
borrow::Cow,
@@ -100,9 +97,7 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
to_worker: TracingUnboundedSender<ServiceToWorkerMsg<B, H>>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine: Mutex<HashMap<ConsensusEngineId, Cow<'static, str>>>,
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, Cow<'static, str>), NotificationsSink>>>,
/// Field extracted from the [`Metrics`] struct and necessary to report the
/// notifications-related metrics.
notifications_sizes_metric: Option<HistogramVec>,
@@ -331,8 +326,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
}
};
for (engine_id, protocol_name) in &params.network_config.notifications_protocols {
behaviour.register_notifications_protocol(*engine_id, protocol_name.clone());
for protocol in &params.network_config.notifications_protocols {
behaviour.register_notifications_protocol(protocol.clone());
}
let (transport, bandwidth) = {
let (config_mem, config_wasm) = match params.network_config.transport {
@@ -384,9 +379,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
let external_addresses = Arc::new(Mutex::new(Vec::new()));
let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new()));
let protocol_name_by_engine = Mutex::new({
params.network_config.notifications_protocols.iter().cloned().collect()
});
let service = Arc::new(NetworkService {
bandwidth,
@@ -397,7 +389,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
local_peer_id,
to_worker,
peers_notifications_sinks: peers_notifications_sinks.clone(),
protocol_name_by_engine,
notifications_sizes_metric:
metrics.as_ref().map(|metrics| metrics.notifications_sizes.clone()),
_marker: PhantomData,
@@ -640,40 +631,32 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// The protocol must have been registered with `register_notifications_protocol` or
/// [`NetworkConfiguration::notifications_protocols`](crate::config::NetworkConfiguration::notifications_protocols).
///
pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
pub fn write_notification(&self, target: PeerId, protocol: Cow<'static, str>, message: Vec<u8>) {
// We clone the `NotificationsSink` in order to be able to unlock the network-wide
// `peers_notifications_sinks` mutex as soon as possible.
let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, engine_id)) {
if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) {
sink.clone()
} else {
// Notification silently discarded, as documented.
log::error!(
target: "sub-libp2p",
"Attempted to send notification on unknown protocol: {:?}",
protocol,
);
return;
}
};
// Used later for the metrics report.
let message_len = message.len();
// Determine the wire protocol name corresponding to this `engine_id`.
let protocol_name = self.protocol_name_by_engine.lock().get(&engine_id).cloned();
if let Some(protocol_name) = protocol_name {
sink.send_sync_notification(protocol_name, message);
} else {
log::error!(
target: "sub-libp2p",
"Attempted to send notification on unknown protocol: {:?}",
engine_id,
);
return;
}
if let Some(notifications_sizes_metric) = self.notifications_sizes_metric.as_ref() {
notifications_sizes_metric
.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)])
.observe(message_len as f64);
.with_label_values(&["out", &protocol])
.observe(message.len() as f64);
}
// Sending is communicated to the `NotificationsSink`.
sink.send_sync_notification(protocol, message);
}
/// Obtains a [`NotificationSender`] for a connected peer, if it exists.
@@ -746,31 +729,27 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
pub fn notification_sender(
&self,
target: PeerId,
engine_id: ConsensusEngineId,
protocol: Cow<'static, str>,
) -> Result<NotificationSender, NotificationSenderError> {
// We clone the `NotificationsSink` in order to be able to unlock the network-wide
// `peers_notifications_sinks` mutex as soon as possible.
let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, engine_id)) {
if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) {
sink.clone()
} else {
return Err(NotificationSenderError::Closed);
}
};
// Determine the wire protocol name corresponding to this `engine_id`.
let protocol_name = match self.protocol_name_by_engine.lock().get(&engine_id).cloned() {
Some(p) => p,
None => return Err(NotificationSenderError::BadProtocol),
};
let notification_size_metric = self.notifications_sizes_metric.as_ref().map(|histogram| {
histogram.with_label_values(&["out", &protocol])
});
Ok(NotificationSender {
sink,
protocol_name,
notification_size_metric: self.notifications_sizes_metric.as_ref().map(|histogram| {
histogram.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)])
}),
protocol_name: protocol,
notification_size_metric,
})
}
@@ -841,17 +820,13 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
///
/// Please call `event_stream` before registering a protocol, otherwise you may miss events
/// about the protocol that you have registered.
// TODO: remove this method after https://github.com/paritytech/substrate/issues/4587
// TODO: remove this method after https://github.com/paritytech/substrate/issues/6827
pub fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, str>>,
) {
let protocol_name = protocol_name.into();
self.protocol_name_by_engine.lock().insert(engine_id, protocol_name.clone());
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RegisterNotifProtocol {
engine_id,
protocol_name,
protocol_name: protocol_name.into(),
});
}
@@ -1209,7 +1184,6 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
RegisterNotifProtocol {
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, str>,
},
DisconnectPeer(PeerId),
@@ -1253,7 +1227,7 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
>,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, Cow<'static, str>), NotificationsSink>>>,
}
impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
@@ -1347,10 +1321,8 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
},
}
},
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => {
this.network_service
.register_notifications_protocol(engine_id, protocol_name);
},
ServiceToWorkerMsg::RegisterNotifProtocol { protocol_name } =>
this.network_service.register_notifications_protocol(protocol_name),
ServiceToWorkerMsg::DisconnectPeer(who) =>
this.network_service.user_protocol_mut().disconnect_peer(&who),
ServiceToWorkerMsg::UpdateChain =>
@@ -1474,24 +1446,28 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
.inc();
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { remote, engine_id, notifications_sink, role })) => {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened {
remote, protocol, notifications_sink, role
})) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.notifications_streams_opened_total
.with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id)]).inc();
.with_label_values(&[&protocol]).inc();
}
{
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
peers_notifications_sinks.insert((remote.clone(), engine_id), notifications_sink);
peers_notifications_sinks.insert((remote.clone(), protocol.clone()), notifications_sink);
}
this.event_streams.send(Event::NotificationStreamOpened {
remote,
engine_id,
protocol,
role,
});
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { remote, engine_id, notifications_sink })) => {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced {
remote, protocol, notifications_sink
})) => {
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
if let Some(s) = peers_notifications_sinks.get_mut(&(remote, engine_id)) {
if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) {
*s = notifications_sink;
} else {
log::error!(
@@ -1513,33 +1489,33 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
// https://github.com/paritytech/substrate/issues/6403.
/*this.event_streams.send(Event::NotificationStreamClosed {
remote,
engine_id,
protocol,
});
this.event_streams.send(Event::NotificationStreamOpened {
remote,
engine_id,
protocol,
role,
});*/
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, engine_id })) => {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, protocol })) => {
if let Some(metrics) = this.metrics.as_ref() {
metrics.notifications_streams_closed_total
.with_label_values(&[&maybe_utf8_bytes_to_string(&engine_id[..])]).inc();
.with_label_values(&[&protocol[..]]).inc();
}
this.event_streams.send(Event::NotificationStreamClosed {
remote: remote.clone(),
engine_id,
protocol: protocol.clone(),
});
{
let mut peers_notifications_sinks = this.peers_notifications_sinks.lock();
peers_notifications_sinks.remove(&(remote.clone(), engine_id));
peers_notifications_sinks.remove(&(remote.clone(), protocol));
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages })) => {
if let Some(metrics) = this.metrics.as_ref() {
for (engine_id, message) in &messages {
for (protocol, message) in &messages {
metrics.notifications_sizes
.with_label_values(&["in", &maybe_utf8_bytes_to_string(engine_id)])
.with_label_values(&["in", protocol])
.observe(message.len() as f64);
}
}
@@ -1748,17 +1724,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
}
/// Turns bytes that are potentially UTF-8 into a reasonable representable string.
///
/// Meant to be used only for debugging or metrics-reporting purposes.
pub(crate) fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow<str> {
if let Ok(s) = std::str::from_utf8(&id[..]) {
Cow::Borrowed(s)
} else {
Cow::Owned(format!("{:?}", id))
}
}
/// The libp2p swarm, customized for our needs.
type Swarm<B, H> = libp2p::swarm::Swarm<Behaviour<B, H>>;
@@ -33,7 +33,6 @@
//!
use crate::Event;
use super::maybe_utf8_bytes_to_string;
use futures::{prelude::*, channel::mpsc, ready, stream::FusedStream};
use parking_lot::Mutex;
@@ -228,23 +227,23 @@ impl Metrics {
.with_label_values(&["dht", "sent", name])
.inc_by(num);
}
Event::NotificationStreamOpened { engine_id, .. } => {
Event::NotificationStreamOpened { protocol, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent", name])
.with_label_values(&[&format!("notif-open-{:?}", protocol), "sent", name])
.inc_by(num);
},
Event::NotificationStreamClosed { engine_id, .. } => {
Event::NotificationStreamClosed { protocol, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent", name])
.with_label_values(&[&format!("notif-closed-{:?}", protocol), "sent", name])
.inc_by(num);
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
for (protocol, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent", name])
.with_label_values(&[&format!("notif-{:?}", protocol), "sent", name])
.inc_by(num);
self.notifications_sizes
.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id), "sent", name])
.with_label_values(&[protocol, "sent", name])
.inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value())));
}
},
@@ -258,23 +257,23 @@ impl Metrics {
.with_label_values(&["dht", "received", name])
.inc();
}
Event::NotificationStreamOpened { engine_id, .. } => {
Event::NotificationStreamOpened { protocol, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received", name])
.with_label_values(&[&format!("notif-open-{:?}", protocol), "received", name])
.inc();
},
Event::NotificationStreamClosed { engine_id, .. } => {
Event::NotificationStreamClosed { protocol, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received", name])
.with_label_values(&[&format!("notif-closed-{:?}", protocol), "received", name])
.inc();
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
for (protocol, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "received", name])
.with_label_values(&[&format!("notif-{:?}", protocol), "received", name])
.inc();
self.notifications_sizes
.with_label_values(&[&maybe_utf8_bytes_to_string(engine_id), "received", name])
.with_label_values(&[&protocol, "received", name])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value()));
}
},
+23 -23
View File
@@ -21,7 +21,7 @@ use crate::{config, Event, NetworkService, NetworkWorker};
use libp2p::PeerId;
use futures::prelude::*;
use sp_runtime::traits::{Block as BlockT, Header as _};
use std::{sync::Arc, time::Duration};
use std::{borrow::Cow, sync::Arc, time::Duration};
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _};
type TestNetworkService = NetworkService<
@@ -121,24 +121,24 @@ fn build_test_full_node(config: config::NetworkConfiguration)
(service, event_stream)
}
const ENGINE_ID: sp_runtime::ConsensusEngineId = *b"foo\0";
const PROTOCOL_NAME: Cow<'static, str> = Cow::Borrowed("/foo");
/// Builds two nodes and their associated events stream.
/// The nodes are connected together and have the `ENGINE_ID` protocol registered.
/// The nodes are connected together and have the `PROTOCOL_NAME` protocol registered.
fn build_nodes_one_proto()
-> (Arc<TestNetworkService>, impl Stream<Item = Event>, Arc<TestNetworkService>, impl Stream<Item = Event>)
{
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
notifications_protocols: vec![PROTOCOL_NAME],
listen_addresses: vec![listen_addr.clone()],
transport: config::TransportConfig::MemoryOnly,
.. config::NetworkConfiguration::new_local()
});
let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
notifications_protocols: vec![PROTOCOL_NAME],
listen_addresses: vec![],
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr,
@@ -161,10 +161,10 @@ fn notifications_state_consistent() {
// Write some initial notifications that shouldn't get through.
for _ in 0..(rand::random::<u8>() % 5) {
node1.write_notification(node2.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
node1.write_notification(node2.local_peer_id().clone(), PROTOCOL_NAME, b"hello world".to_vec());
}
for _ in 0..(rand::random::<u8>() % 5) {
node2.write_notification(node1.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
node2.write_notification(node1.local_peer_id().clone(), PROTOCOL_NAME, b"hello world".to_vec());
}
async_std::task::block_on(async move {
@@ -187,10 +187,10 @@ fn notifications_state_consistent() {
// Start by sending a notification from node1 to node2 and vice-versa. Part of the
// test consists in ensuring that notifications get ignored if the stream isn't open.
if rand::random::<u8>() % 5 >= 3 {
node1.write_notification(node2.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
node1.write_notification(node2.local_peer_id().clone(), PROTOCOL_NAME, b"hello world".to_vec());
}
if rand::random::<u8>() % 5 >= 3 {
node2.write_notification(node1.local_peer_id().clone(), ENGINE_ID, b"hello world".to_vec());
node2.write_notification(node1.local_peer_id().clone(), PROTOCOL_NAME, b"hello world".to_vec());
}
// Also randomly disconnect the two nodes from time to time.
@@ -219,31 +219,31 @@ fn notifications_state_consistent() {
};
match next_event {
future::Either::Left(Event::NotificationStreamOpened { remote, engine_id, .. }) => {
future::Either::Left(Event::NotificationStreamOpened { remote, protocol, .. }) => {
something_happened = true;
assert!(!node1_to_node2_open);
node1_to_node2_open = true;
assert_eq!(remote, *node2.local_peer_id());
assert_eq!(engine_id, ENGINE_ID);
assert_eq!(protocol, PROTOCOL_NAME);
}
future::Either::Right(Event::NotificationStreamOpened { remote, engine_id, .. }) => {
future::Either::Right(Event::NotificationStreamOpened { remote, protocol, .. }) => {
something_happened = true;
assert!(!node2_to_node1_open);
node2_to_node1_open = true;
assert_eq!(remote, *node1.local_peer_id());
assert_eq!(engine_id, ENGINE_ID);
assert_eq!(protocol, PROTOCOL_NAME);
}
future::Either::Left(Event::NotificationStreamClosed { remote, engine_id, .. }) => {
future::Either::Left(Event::NotificationStreamClosed { remote, protocol, .. }) => {
assert!(node1_to_node2_open);
node1_to_node2_open = false;
assert_eq!(remote, *node2.local_peer_id());
assert_eq!(engine_id, ENGINE_ID);
assert_eq!(protocol, PROTOCOL_NAME);
}
future::Either::Right(Event::NotificationStreamClosed { remote, engine_id, .. }) => {
future::Either::Right(Event::NotificationStreamClosed { remote, protocol, .. }) => {
assert!(node2_to_node1_open);
node2_to_node1_open = false;
assert_eq!(remote, *node1.local_peer_id());
assert_eq!(engine_id, ENGINE_ID);
assert_eq!(protocol, PROTOCOL_NAME);
}
future::Either::Left(Event::NotificationsReceived { remote, .. }) => {
assert!(node1_to_node2_open);
@@ -251,7 +251,7 @@ fn notifications_state_consistent() {
if rand::random::<u8>() % 5 >= 4 {
node1.write_notification(
node2.local_peer_id().clone(),
ENGINE_ID,
PROTOCOL_NAME,
b"hello world".to_vec()
);
}
@@ -262,7 +262,7 @@ fn notifications_state_consistent() {
if rand::random::<u8>() % 5 >= 4 {
node2.write_notification(
node1.local_peer_id().clone(),
ENGINE_ID,
PROTOCOL_NAME,
b"hello world".to_vec()
);
}
@@ -281,7 +281,7 @@ fn lots_of_incoming_peers_works() {
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (main_node, _) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
notifications_protocols: vec![PROTOCOL_NAME],
listen_addresses: vec![listen_addr.clone()],
in_peers: u32::max_value(),
transport: config::TransportConfig::MemoryOnly,
@@ -298,7 +298,7 @@ fn lots_of_incoming_peers_works() {
let main_node_peer_id = main_node_peer_id.clone();
let (_dialing_node, event_stream) = build_test_full_node(config::NetworkConfiguration {
notifications_protocols: vec![(ENGINE_ID, From::from("/foo"))],
notifications_protocols: vec![PROTOCOL_NAME],
listen_addresses: vec![],
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr.clone(),
@@ -364,7 +364,7 @@ fn notifications_back_pressure() {
Event::NotificationStreamClosed { .. } => panic!(),
Event::NotificationsReceived { messages, .. } => {
for message in messages {
assert_eq!(message.0, ENGINE_ID);
assert_eq!(message.0, PROTOCOL_NAME);
assert_eq!(message.1, format!("hello #{}", received_notifications));
received_notifications += 1;
}
@@ -389,7 +389,7 @@ fn notifications_back_pressure() {
// Sending!
for num in 0..TOTAL_NOTIFS {
let notif = node1.notification_sender(node2_id.clone(), ENGINE_ID).unwrap();
let notif = node1.notification_sender(node2_id.clone(), PROTOCOL_NAME).unwrap();
notif.ready().await.unwrap().send(format!("hello #{}", num)).unwrap();
}