Move block announcement protocol config out of Protocol (#12441)

* Move Role(s) to `sc-network-common`

* Introduce `NotificationHandshake` type

* Move block announce protocol config creation to `ChainSync`

* Include block announcement into `notification_protocols`

* Apply review comments

* Remove unneeded include

* Add missing include

* Apply review comments
This commit is contained in:
Aaro Altonen
2022-10-10 10:10:53 +03:00
committed by GitHub
parent 0b77060986
commit ce9ce49bc6
23 changed files with 439 additions and 263 deletions
+3 -2
View File
@@ -19,7 +19,7 @@
use crate::{
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
peer_info,
protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol},
protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
request_responses,
};
@@ -41,7 +41,8 @@ use sc_consensus::import_queue::{IncomingBlock, RuntimeOrigin};
use sc_network_common::{
config::ProtocolId,
protocol::{
event::{DhtEvent, ObservedRole},
event::DhtEvent,
role::{ObservedRole, Roles},
ProtocolName,
},
request_responses::{IfDisconnected, ProtocolConfig, RequestFailure},
+4 -25
View File
@@ -23,6 +23,7 @@
pub use sc_network_common::{
config::ProtocolId,
protocol::role::Role,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
@@ -93,6 +94,9 @@ where
/// Registry for recording prometheus metrics to.
pub metrics_registry: Option<Registry>,
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,
/// Request response configuration for the block request protocol.
///
/// [`RequestResponseConfig::name`] is used to tag outgoing block requests with the correct
@@ -130,31 +134,6 @@ where
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
/// Role of the local node.
#[derive(Debug, Clone)]
pub enum Role {
/// Regular full node.
Full,
/// Actual authority.
Authority,
}
impl Role {
/// True for [`Role::Authority`].
pub fn is_authority(&self) -> bool {
matches!(self, Self::Authority { .. })
}
}
impl fmt::Display for Role {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Full => write!(f, "FULL"),
Self::Authority { .. } => write!(f, "AUTHORITY"),
}
}
}
/// Sync operation mode.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum SyncMode {
+2 -1
View File
@@ -260,7 +260,8 @@ pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use protocol::PeerInfo;
pub use sc_network_common::{
protocol::{
event::{DhtEvent, Event, ObservedRole},
event::{DhtEvent, Event},
role::ObservedRole,
ProtocolName,
},
request_responses::{IfDisconnected, RequestFailure},
+35 -116
View File
@@ -31,10 +31,7 @@ use libp2p::{
Multiaddr, PeerId,
};
use log::{debug, error, info, log, trace, warn, Level};
use message::{
generic::{Message as GenericMessage, Roles},
Message,
};
use message::{generic::Message as GenericMessage, Message};
use notifications::{Notifications, NotificationsOut};
use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64};
use sc_client_api::HeaderBackend;
@@ -42,13 +39,14 @@ use sc_consensus::import_queue::{
BlockImportError, BlockImportStatus, IncomingBlock, RuntimeOrigin,
};
use sc_network_common::{
config::{NonReservedPeerMode, ProtocolId},
config::NonReservedPeerMode,
error,
protocol::ProtocolName,
protocol::{role::Roles, ProtocolName},
request_responses::RequestFailure,
sync::{
message::{
BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState,
BlockAnnounce, BlockAnnouncesHandshake, BlockAttributes, BlockData, BlockRequest,
BlockResponse, BlockState,
},
warp::{EncodedProof, WarpProofRequest},
BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, OpaqueBlockRequest,
@@ -85,8 +83,6 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
/// Maximum number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// Maximum allowed size for a block announce.
const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024;
/// Maximum size used for notifications in the block announce and transaction protocols.
// Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`.
@@ -235,30 +231,6 @@ pub struct PeerInfo<B: BlockT> {
pub best_number: <B::Header as HeaderT>::Number,
}
/// Handshake sent when we open a block announces substream.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
struct BlockAnnouncesHandshake<B: BlockT> {
/// Roles of the node.
roles: Roles,
/// Best block number.
best_number: NumberFor<B>,
/// Best block hash.
best_hash: B::Hash,
/// Genesis block hash.
genesis_hash: B::Hash,
}
impl<B: BlockT> BlockAnnouncesHandshake<B> {
fn build(
roles: Roles,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> Self {
Self { genesis_hash, roles, best_number, best_hash }
}
}
impl<B, Client> Protocol<B, Client>
where
B: BlockT,
@@ -268,12 +240,10 @@ where
pub fn new(
roles: Roles,
chain: Arc<Client>,
protocol_id: ProtocolId,
fork_id: &Option<String>,
network_config: &config::NetworkConfiguration,
notifications_protocols_handshakes: Vec<Vec<u8>>,
metrics_registry: Option<&Registry>,
chain_sync: Box<dyn ChainSync<B>>,
block_announces_protocol: sc_network_common::config::NonDefaultSetConfig,
) -> error::Result<(Self, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> {
let info = chain.info();
@@ -365,51 +335,24 @@ where
sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets })
};
let block_announces_protocol = {
let genesis_hash =
chain.hash(0u32.into()).ok().flatten().expect("Genesis block exists; qed");
let genesis_hash = genesis_hash.as_ref();
if let Some(fork_id) = fork_id {
format!(
"/{}/{}/block-announces/1",
array_bytes::bytes2hex("", genesis_hash),
fork_id
)
} else {
format!("/{}/block-announces/1", array_bytes::bytes2hex("", genesis_hash))
}
};
let legacy_ba_protocol_name = format!("/{}/block-announces/1", protocol_id.as_ref());
let behaviour = {
let best_number = info.best_number;
let best_hash = info.best_hash;
let genesis_hash = info.genesis_hash;
let block_announces_handshake =
BlockAnnouncesHandshake::<B>::build(roles, best_number, best_hash, genesis_hash)
.encode();
let sync_protocol_config = notifications::ProtocolConfig {
name: block_announces_protocol.into(),
fallback_names: iter::once(legacy_ba_protocol_name.into()).collect(),
handshake: block_announces_handshake,
max_notification_size: MAX_BLOCK_ANNOUNCE_SIZE,
};
Notifications::new(
peerset,
iter::once(sync_protocol_config).chain(
network_config.extra_sets.iter().zip(notifications_protocols_handshakes).map(
|(s, hs)| notifications::ProtocolConfig {
name: s.notifications_protocol.clone(),
fallback_names: s.fallback_names.clone(),
handshake: hs,
max_notification_size: s.max_notification_size,
},
),
),
// NOTE: Block announcement protocol is still very much hardcoded into `Protocol`.
// This protocol must be the first notification protocol given to
// `Notifications`
iter::once(notifications::ProtocolConfig {
name: block_announces_protocol.notifications_protocol.clone(),
fallback_names: block_announces_protocol.fallback_names.clone(),
handshake: block_announces_protocol.handshake.as_ref().unwrap().to_vec(),
max_notification_size: block_announces_protocol.max_notification_size,
})
.chain(network_config.extra_sets.iter().map(|s| notifications::ProtocolConfig {
name: s.notifications_protocol.clone(),
fallback_names: s.fallback_names.clone(),
handshake: s.handshake.as_ref().map_or(roles.encode(), |h| (*h).to_vec()),
max_notification_size: s.max_notification_size,
})),
)
};
@@ -437,10 +380,8 @@ where
},
peerset_handle: peerset_handle.clone(),
behaviour,
notification_protocols: network_config
.extra_sets
.iter()
.map(|s| s.notifications_protocol.clone())
notification_protocols: iter::once(block_announces_protocol.notifications_protocol)
.chain(network_config.extra_sets.iter().map(|s| s.notifications_protocol.clone()))
.collect(),
bad_handshake_substreams: Default::default(),
metrics: if let Some(r) = metrics_registry {
@@ -469,10 +410,7 @@ where
pub fn disconnect_peer(&mut self, peer_id: &PeerId, protocol_name: ProtocolName) {
if let Some(position) = self.notification_protocols.iter().position(|p| *p == protocol_name)
{
self.behaviour.disconnect_peer(
peer_id,
sc_peerset::SetId::from(position + NUM_HARDCODED_PEERSETS),
);
self.behaviour.disconnect_peer(peer_id, sc_peerset::SetId::from(position));
} else {
warn!(target: "sub-libp2p", "disconnect_peer() with invalid protocol name")
}
@@ -1095,8 +1033,7 @@ where
/// Sets the list of reserved peers for the given protocol/peerset.
pub fn set_reserved_peerset_peers(&self, protocol: ProtocolName, peers: HashSet<PeerId>) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle
.set_reserved_peers(sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), peers);
self.peerset_handle.set_reserved_peers(sc_peerset::SetId::from(index), peers);
} else {
error!(
target: "sub-libp2p",
@@ -1109,10 +1046,7 @@ where
/// Removes a `PeerId` from the list of reserved peers.
pub fn remove_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle.remove_reserved_peer(
sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS),
peer,
);
self.peerset_handle.remove_reserved_peer(sc_peerset::SetId::from(index), peer);
} else {
error!(
target: "sub-libp2p",
@@ -1125,8 +1059,7 @@ where
/// Adds a `PeerId` to the list of reserved peers.
pub fn add_set_reserved_peer(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle
.add_reserved_peer(sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), peer);
self.peerset_handle.add_reserved_peer(sc_peerset::SetId::from(index), peer);
} else {
error!(
target: "sub-libp2p",
@@ -1148,8 +1081,7 @@ where
/// Add a peer to a peers set.
pub fn add_to_peers_set(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle
.add_to_peers_set(sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS), peer);
self.peerset_handle.add_to_peers_set(sc_peerset::SetId::from(index), peer);
} else {
error!(
target: "sub-libp2p",
@@ -1162,10 +1094,7 @@ where
/// Remove a peer from a peers set.
pub fn remove_from_peers_set(&self, protocol: ProtocolName, peer: PeerId) {
if let Some(index) = self.notification_protocols.iter().position(|p| *p == protocol) {
self.peerset_handle.remove_from_peers_set(
sc_peerset::SetId::from(index + NUM_HARDCODED_PEERSETS),
peer,
);
self.peerset_handle.remove_from_peers_set(sc_peerset::SetId::from(index), peer);
} else {
error!(
target: "sub-libp2p",
@@ -1627,14 +1556,12 @@ where
}
} else {
match (
message::Roles::decode_all(&mut &received_handshake[..]),
Roles::decode_all(&mut &received_handshake[..]),
self.peers.get(&peer_id),
) {
(Ok(roles), _) => CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols
[usize::from(set_id) - NUM_HARDCODED_PEERSETS]
.clone(),
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
roles,
notifications_sink,
@@ -1646,9 +1573,7 @@ where
// TODO: remove this after https://github.com/paritytech/substrate/issues/5685
CustomMessageOutcome::NotificationStreamOpened {
remote: peer_id,
protocol: self.notification_protocols
[usize::from(set_id) - NUM_HARDCODED_PEERSETS]
.clone(),
protocol: self.notification_protocols[usize::from(set_id)].clone(),
negotiated_fallback,
roles: peer.info.roles,
notifications_sink,
@@ -1672,9 +1597,7 @@ where
} else {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
protocol: self.notification_protocols
[usize::from(set_id) - NUM_HARDCODED_PEERSETS]
.clone(),
protocol: self.notification_protocols[usize::from(set_id)].clone(),
notifications_sink,
}
},
@@ -1699,9 +1622,7 @@ where
} else {
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
protocol: self.notification_protocols
[usize::from(set_id) - NUM_HARDCODED_PEERSETS]
.clone(),
protocol: self.notification_protocols[usize::from(set_id)].clone(),
}
}
},
@@ -1734,9 +1655,7 @@ where
_ if self.bad_handshake_substreams.contains(&(peer_id, set_id)) =>
CustomMessageOutcome::None,
_ => {
let protocol_name = self.notification_protocols
[usize::from(set_id) - NUM_HARDCODED_PEERSETS]
.clone();
let protocol_name = self.notification_protocols[usize::from(set_id)].clone();
CustomMessageOutcome::NotificationsReceived {
remote: peer_id,
messages: vec![(protocol_name, message.freeze())],
@@ -21,7 +21,7 @@
pub use self::generic::{
RemoteCallRequest, RemoteChangesRequest, RemoteChangesResponse, RemoteHeaderRequest,
RemoteHeaderResponse, RemoteReadChildRequest, RemoteReadRequest, Roles,
RemoteHeaderResponse, RemoteReadChildRequest, RemoteReadRequest,
};
use codec::{Decode, Encode};
use sc_client_api::StorageProof;
@@ -57,11 +57,11 @@ pub struct RemoteReadResponse {
/// Generic types.
pub mod generic {
use super::{RemoteCallResponse, RemoteReadResponse};
use bitflags::bitflags;
use codec::{Decode, Encode, Input, Output};
use codec::{Decode, Encode, Input};
use sc_client_api::StorageProof;
use sc_network_common::{
message::RequestId,
protocol::role::Roles,
sync::message::{
generic::{BlockRequest, BlockResponse},
BlockAnnounce,
@@ -69,60 +69,6 @@ pub mod generic {
};
use sp_runtime::ConsensusEngineId;
bitflags! {
/// Bitmask of the roles that a node fulfills.
pub struct Roles: u8 {
/// No network.
const NONE = 0b00000000;
/// Full node, does not participate in consensus.
const FULL = 0b00000001;
/// Light client node.
const LIGHT = 0b00000010;
/// Act as an authority
const AUTHORITY = 0b00000100;
}
}
impl Roles {
/// Does this role represents a client that holds full chain data locally?
pub fn is_full(&self) -> bool {
self.intersects(Self::FULL | Self::AUTHORITY)
}
/// Does this role represents a client that does not participates in the consensus?
pub fn is_authority(&self) -> bool {
*self == Self::AUTHORITY
}
/// Does this role represents a client that does not hold full chain data locally?
pub fn is_light(&self) -> bool {
!self.is_full()
}
}
impl<'a> From<&'a crate::config::Role> for Roles {
fn from(roles: &'a crate::config::Role) -> Self {
match roles {
crate::config::Role::Full => Self::FULL,
crate::config::Role::Authority { .. } => Self::AUTHORITY,
}
}
}
impl codec::Encode for Roles {
fn encode_to<T: Output + ?Sized>(&self, dest: &mut T) {
dest.push_byte(self.bits())
}
}
impl codec::EncodeLike for Roles {}
impl codec::Decode for Roles {
fn decode<I: Input>(input: &mut I) -> Result<Self, codec::Error> {
Self::from_bits(input.read_byte()?).ok_or_else(|| codec::Error::from("Invalid bytes"))
}
}
/// Consensus is mostly opaque to us
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
pub struct ConsensusMessage {
+2 -12
View File
@@ -34,14 +34,10 @@ use crate::{
network_state::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
protocol::{
self, message::generic::Roles, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol,
Ready,
},
protocol::{self, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready},
transport, ReputationChange,
};
use codec::Encode as _;
use futures::{channel::oneshot, prelude::*};
use libp2p::{
core::{either::EitherError, upgrade, ConnectedPoint, Executor},
@@ -222,19 +218,13 @@ where
local_peer_id.to_base58(),
);
let default_notif_handshake_message = Roles::from(&params.role).encode();
let (protocol, peerset_handle, mut known_addresses) = Protocol::new(
From::from(&params.role),
params.chain.clone(),
params.protocol_id.clone(),
&params.fork_id,
&params.network_config,
(0..params.network_config.extra_sets.len())
.map(|_| default_notif_handshake_message.clone())
.collect(),
params.metrics_registry.as_ref(),
params.chain_sync,
params.block_announce_config,
)?;
// List of multiaddresses that we know in the network.
+107 -3
View File
@@ -20,10 +20,15 @@ use crate::{config, NetworkService, NetworkWorker};
use futures::prelude::*;
use libp2p::PeerId;
use sc_client_api::{BlockBackend, HeaderBackend};
use sc_network_common::{
config::{MultiaddrWithPeerId, NonDefaultSetConfig, ProtocolId, SetConfig, TransportConfig},
protocol::event::Event,
config::{
MultiaddrWithPeerId, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake,
ProtocolId, SetConfig, TransportConfig,
},
protocol::{event::Event, role::Roles},
service::{NetworkEventStream, NetworkNotification, NetworkPeers, NetworkStateInfo},
sync::message::BlockAnnouncesHandshake,
};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
use sc_network_sync::{
@@ -31,7 +36,7 @@ use sc_network_sync::{
ChainSync,
};
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use sp_runtime::traits::{Block as BlockT, Header as _};
use sp_runtime::traits::{Block as BlockT, Header as _, Zero};
use std::{sync::Arc, time::Duration};
use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _};
@@ -132,7 +137,33 @@ fn build_test_full_node(
None,
)
.unwrap();
let block_announce_config = NonDefaultSetConfig {
notifications_protocol: BLOCK_ANNOUNCE_PROTO_NAME.into(),
fallback_names: vec![],
max_notification_size: 1024 * 1024,
handshake: Some(NotificationHandshake::new(BlockAnnouncesHandshake::<
substrate_test_runtime_client::runtime::Block,
>::build(
Roles::from(&config::Role::Full),
client.info().best_number,
client.info().best_hash,
client
.block_hash(Zero::zero())
.ok()
.flatten()
.expect("Genesis block exists; qed"),
))),
set_config: SetConfig {
in_peers: 0,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
};
let worker = NetworkWorker::new(config::Params {
block_announce_config,
role: config::Role::Full,
executor: None,
network_config,
@@ -161,6 +192,7 @@ fn build_test_full_node(
(service, event_stream)
}
const BLOCK_ANNOUNCE_PROTO_NAME: &str = "/block-announces";
const PROTOCOL_NAME: &str = "/foo";
/// Builds two nodes and their associated events stream.
@@ -178,6 +210,7 @@ fn build_nodes_one_proto() -> (
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: Default::default(),
}],
listen_addresses: vec![listen_addr.clone()],
@@ -190,6 +223,7 @@ fn build_nodes_one_proto() -> (
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
@@ -368,6 +402,7 @@ fn lots_of_incoming_peers_works() {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: SetConfig { in_peers: u32::MAX, ..Default::default() },
}],
transport: TransportConfig::MemoryOnly,
@@ -387,6 +422,7 @@ fn lots_of_incoming_peers_works() {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr.clone(),
@@ -504,6 +540,7 @@ fn fallback_name_working() {
notifications_protocol: NEW_PROTOCOL_NAME.into(),
fallback_names: vec![PROTOCOL_NAME.into()],
max_notification_size: 1024 * 1024,
handshake: None,
set_config: Default::default(),
}],
listen_addresses: vec![listen_addr.clone()],
@@ -516,6 +553,7 @@ fn fallback_name_working() {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
@@ -561,6 +599,72 @@ fn fallback_name_working() {
});
}
// Disconnect peer by calling `Protocol::disconnect_peer()` with the supplied block announcement
// protocol name and verify that `SyncDisconnected` event is emitted
#[async_std::test]
async fn disconnect_sync_peer_using_block_announcement_protocol_name() {
let listen_addr = config::build_multiaddr![Memory(rand::random::<u64>())];
let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: vec![],
max_notification_size: 1024 * 1024,
handshake: None,
set_config: Default::default(),
}],
listen_addresses: vec![listen_addr.clone()],
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
let (node2, mut events_stream2) = build_test_full_node(config::NetworkConfiguration {
extra_sets: vec![NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME.into(),
fallback_names: Vec::new(),
max_notification_size: 1024 * 1024,
handshake: None,
set_config: SetConfig {
reserved_nodes: vec![MultiaddrWithPeerId {
multiaddr: listen_addr,
peer_id: node1.local_peer_id(),
}],
..Default::default()
},
}],
listen_addresses: vec![],
transport: TransportConfig::MemoryOnly,
..config::NetworkConfiguration::new_local()
});
loop {
match events_stream1.next().await.unwrap() {
Event::NotificationStreamOpened { .. } => break,
_ => {},
};
}
loop {
match events_stream2.next().await.unwrap() {
Event::NotificationStreamOpened { .. } => break,
_ => {},
};
}
// disconnect peer using `PROTOCOL_NAME`, verify `NotificationStreamClosed` event is emitted
node2.disconnect_peer(node1.local_peer_id(), PROTOCOL_NAME.into());
assert!(std::matches!(
events_stream2.next().await,
Some(Event::NotificationStreamClosed { .. })
));
let _ = events_stream2.next().await; // ignore the reopen event
// now disconnect using the block announcement protocol, verify that `SyncDisconnected` is
// emitted
node2.disconnect_peer(node1.local_peer_id(), BLOCK_ANNOUNCE_PROTO_NAME.into());
assert!(std::matches!(events_stream2.next().await, Some(Event::SyncDisconnected { .. })));
}
#[test]
#[should_panic(expected = "don't match the transport")]
fn ensure_listen_addresses_consistent_with_transport_memory() {