split NetworkBridge into two subsystems (#5616)

* foo

* rolling session window

* fixup

* remove use statemetn

* fmt

* split NetworkBridge into two subsystems

Pending cleanup

* split

* chore: reexport OrchestraError as OverseerError

* chore: silence warnings

* fixup tests

* chore: add default timenout of 30s to subsystem test helper ctx handle

* single item channel

* fixins

* fmt

* cleanup

* remove dead code

* remove sync bounds again

* wire up shared state

* deal with some FIXMEs

* use distinct tags

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>

* use tag

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>

* address naming

tx and rx are common in networking and also have an implicit meaning regarding networking
compared to incoming and outgoing which are already used with subsystems themselvesq

* remove unused sync oracle

* remove unneeded state

* fix tests

* chore: fmt

* do not try to register twice

* leak Metrics type

Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com>
Co-authored-by: Andronik <write@reusable.software>
This commit is contained in:
Bernhard Schuster
2022-07-12 18:22:36 +02:00
committed by GitHub
parent c11c1f38f4
commit 3240cb5e4d
40 changed files with 1880 additions and 1429 deletions
+2
View File
@@ -19,6 +19,8 @@ polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem-util = { path = "../../subsystem-util"}
parking_lot = "0.12.0"
bytes = "1"
fatality = "0.0.6"
thiserror = "1"
[dev-dependencies]
assert_matches = "1.4.0"
@@ -0,0 +1,20 @@
use polkadot_node_subsystem::SubsystemError;
pub(crate) use polkadot_overseer::OverseerError;
#[fatality::fatality(splitable)]
pub(crate) enum Error {
/// Received error from overseer:
#[fatal]
#[error(transparent)]
SubsystemError(#[from] SubsystemError),
/// The stream of incoming events concluded.
#[fatal]
#[error("Event stream closed unexpectedly")]
EventStreamConcluded,
}
impl From<OverseerError> for Error {
fn from(e: OverseerError) -> Self {
Error::SubsystemError(SubsystemError::from(e))
}
}
File diff suppressed because it is too large Load Diff
@@ -26,6 +26,7 @@ fn peer_set_label(peer_set: PeerSet, version: ProtocolVersion) -> &'static str {
peer_set.get_protocol_name_static(version).unwrap_or("<internal error>")
}
#[allow(missing_docs)]
impl Metrics {
pub fn on_peer_connected(&self, peer_set: PeerSet, version: ProtocolVersion) {
self.0.as_ref().map(|metrics| {
+2 -1
View File
@@ -35,7 +35,8 @@ use polkadot_primitives::v2::{AuthorityDiscoveryId, Block, Hash};
use crate::validator_discovery::AuthorityDiscovery;
use super::LOG_TARGET;
// network bridge network abstraction log target
const LOG_TARGET: &'static str = "parachain::network-bridge-net";
/// Send a message to the network.
///
+874
View File
@@ -0,0 +1,874 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Network Bridge Subsystem - handles _incoming_ messages from the network, forwarded to the relevant subsystems.
use super::*;
use always_assert::never;
use bytes::Bytes;
use futures::stream::BoxStream;
use parity_scale_codec::{Decode, DecodeAll};
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;
use polkadot_node_network_protocol::{
self as net_protocol,
peer_set::{PeerSet, PerPeerSet},
v1 as protocol_v1, ObservedRole, OurView, PeerId, ProtocolVersion,
UnifiedReputationChange as Rep, View,
};
use polkadot_node_subsystem::{
errors::SubsystemError,
messages::{
network_bridge_event::{NewGossipTopology, TopologyPeerInfo},
ApprovalDistributionMessage, BitfieldDistributionMessage, CollatorProtocolMessage,
GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage,
StatementDistributionMessage,
},
overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
};
use polkadot_primitives::v2::{AuthorityDiscoveryId, BlockNumber, Hash, ValidatorIndex};
/// Peer set info for network initialization.
///
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use std::{
collections::{hash_map, HashMap},
iter::ExactSizeIterator,
};
use super::validator_discovery;
/// Actual interfacing to the network based on the `Network` trait.
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
use crate::network::{send_message, Network};
use crate::network::get_peer_id_by_authority_id;
use super::metrics::Metrics;
#[cfg(test)]
mod tests;
// network bridge log target
const LOG_TARGET: &'static str = "parachain::network-bridge-rx";
/// The network bridge subsystem - network receiving side.
pub struct NetworkBridgeRx<N, AD> {
/// `Network` trait implementing type.
network_service: N,
authority_discovery_service: AD,
sync_oracle: Box<dyn SyncOracle + Send>,
shared: Shared,
metrics: Metrics,
}
impl<N, AD> NetworkBridgeRx<N, AD> {
/// Create a new network bridge subsystem with underlying network service and authority discovery service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`peers_sets_info`](peers_sets_info).
pub fn new(
network_service: N,
authority_discovery_service: AD,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
) -> Self {
let shared = Shared::default();
Self { network_service, authority_discovery_service, sync_oracle, shared, metrics }
}
}
#[overseer::subsystem(NetworkBridgeRx, error = SubsystemError, prefix = self::overseer)]
impl<Net, AD, Context> NetworkBridgeRx<Net, AD>
where
Net: Network + Sync,
AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
{
fn start(mut self, ctx: Context) -> SpawnedSubsystem {
// The stream of networking events has to be created at initialization, otherwise the
// networking might open connections before the stream of events has been grabbed.
let network_stream = self.network_service.event_stream();
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
let future = run_network_in(self, ctx, network_stream)
.map_err(|e| SubsystemError::with_origin("network-bridge", e))
.boxed();
SpawnedSubsystem { name: "network-bridge-subsystem", future }
}
}
async fn update_gossip_peers_1d<AD, N>(
ads: &mut AD,
neighbors: N,
) -> HashMap<AuthorityDiscoveryId, TopologyPeerInfo>
where
AD: validator_discovery::AuthorityDiscovery,
N: IntoIterator<Item = (AuthorityDiscoveryId, ValidatorIndex)>,
N::IntoIter: std::iter::ExactSizeIterator,
{
let neighbors = neighbors.into_iter();
let mut peers = HashMap::with_capacity(neighbors.len());
for (authority, validator_index) in neighbors {
let addr = get_peer_id_by_authority_id(ads, authority.clone()).await;
if let Some(peer_id) = addr {
peers.insert(authority, TopologyPeerInfo { peer_ids: vec![peer_id], validator_index });
}
}
peers
}
async fn handle_network_messages<AD>(
mut sender: impl overseer::NetworkBridgeRxSenderTrait,
mut network_service: impl Network,
network_stream: BoxStream<'static, NetworkEvent>,
mut authority_discovery_service: AD,
metrics: Metrics,
shared: Shared,
) -> Result<(), Error>
where
AD: validator_discovery::AuthorityDiscovery + Send,
{
let mut network_stream = network_stream.fuse();
loop {
match network_stream.next().await {
None => return Err(Error::EventStreamConcluded),
Some(NetworkEvent::Dht(_)) |
Some(NetworkEvent::SyncConnected { .. }) |
Some(NetworkEvent::SyncDisconnected { .. }) => {},
Some(NetworkEvent::NotificationStreamOpened {
remote: peer,
protocol,
role,
negotiated_fallback,
}) => {
let role = ObservedRole::from(role);
let (peer_set, version) = {
let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) {
None => continue,
Some(p) => p,
};
if let Some(fallback) = negotiated_fallback {
match PeerSet::try_from_protocol_name(&fallback) {
None => {
gum::debug!(
target: LOG_TARGET,
fallback = &*fallback,
?peer,
?peer_set,
"Unknown fallback",
);
continue
},
Some((p2, v2)) => {
if p2 != peer_set {
gum::debug!(
target: LOG_TARGET,
fallback = &*fallback,
fallback_peerset = ?p2,
protocol = &*protocol,
peerset = ?peer_set,
"Fallback mismatched peer-set",
);
continue
}
(p2, v2)
},
}
} else {
(peer_set, version)
}
};
gum::debug!(
target: LOG_TARGET,
action = "PeerConnected",
peer_set = ?peer_set,
version,
peer = ?peer,
role = ?role
);
let local_view = {
let mut shared = shared.0.lock();
let peer_map = match peer_set {
PeerSet::Validation => &mut shared.validation_peers,
PeerSet::Collation => &mut shared.collation_peers,
};
match peer_map.entry(peer.clone()) {
hash_map::Entry::Occupied(_) => continue,
hash_map::Entry::Vacant(vacant) => {
vacant.insert(PeerData { view: View::default(), version });
},
}
metrics.on_peer_connected(peer_set, version);
metrics.note_peer_count(peer_set, version, peer_map.len());
shared.local_view.clone().unwrap_or(View::default())
};
let maybe_authority =
authority_discovery_service.get_authority_ids_by_peer_id(peer).await;
match peer_set {
PeerSet::Validation => {
dispatch_validation_events_to_all(
vec![
NetworkBridgeEvent::PeerConnected(
peer.clone(),
role,
1,
maybe_authority,
),
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
],
&mut sender,
)
.await;
send_message(
&mut network_service,
vec![peer],
PeerSet::Validation,
version,
WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(local_view),
&metrics,
);
},
PeerSet::Collation => {
dispatch_collation_events_to_all(
vec![
NetworkBridgeEvent::PeerConnected(
peer.clone(),
role,
1,
maybe_authority,
),
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
],
&mut sender,
)
.await;
send_message(
&mut network_service,
vec![peer],
PeerSet::Collation,
version,
WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(local_view),
&metrics,
);
},
}
},
Some(NetworkEvent::NotificationStreamClosed { remote: peer, protocol }) => {
let (peer_set, version) = match PeerSet::try_from_protocol_name(&protocol) {
None => continue,
Some(peer_set) => peer_set,
};
gum::debug!(
target: LOG_TARGET,
action = "PeerDisconnected",
peer_set = ?peer_set,
peer = ?peer
);
let was_connected = {
let mut shared = shared.0.lock();
let peer_map = match peer_set {
PeerSet::Validation => &mut shared.validation_peers,
PeerSet::Collation => &mut shared.collation_peers,
};
let w = peer_map.remove(&peer).is_some();
metrics.on_peer_disconnected(peer_set, version);
metrics.note_peer_count(peer_set, version, peer_map.len());
w
};
if was_connected && version == peer_set.get_default_version() {
match peer_set {
PeerSet::Validation =>
dispatch_validation_event_to_all(
NetworkBridgeEvent::PeerDisconnected(peer),
&mut sender,
)
.await,
PeerSet::Collation =>
dispatch_collation_event_to_all(
NetworkBridgeEvent::PeerDisconnected(peer),
&mut sender,
)
.await,
}
}
},
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let expected_versions = {
let mut versions = PerPeerSet::<Option<ProtocolVersion>>::default();
let shared = shared.0.lock();
if let Some(peer_data) = shared.validation_peers.get(&remote) {
versions[PeerSet::Validation] = Some(peer_data.version);
}
if let Some(peer_data) = shared.collation_peers.get(&remote) {
versions[PeerSet::Collation] = Some(peer_data.version);
}
versions
};
// non-decoded, but version-checked validation messages.
let v_messages: Result<Vec<_>, _> = messages
.iter()
.filter_map(|(protocol, msg_bytes)| {
// version doesn't matter because we always receive on the 'correct'
// protocol name, not the negotiated fallback.
let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?;
if peer_set == PeerSet::Validation {
if expected_versions[PeerSet::Validation].is_none() {
return Some(Err(UNCONNECTED_PEERSET_COST))
}
Some(Ok(msg_bytes.clone()))
} else {
None
}
})
.collect();
let v_messages = match v_messages {
Err(rep) => {
gum::debug!(target: LOG_TARGET, action = "ReportPeer");
network_service.report_peer(remote, rep);
continue
},
Ok(v) => v,
};
// non-decoded, but version-checked colldation messages.
let c_messages: Result<Vec<_>, _> = messages
.iter()
.filter_map(|(protocol, msg_bytes)| {
// version doesn't matter because we always receive on the 'correct'
// protocol name, not the negotiated fallback.
let (peer_set, _version) = PeerSet::try_from_protocol_name(protocol)?;
if peer_set == PeerSet::Collation {
if expected_versions[PeerSet::Collation].is_none() {
return Some(Err(UNCONNECTED_PEERSET_COST))
}
Some(Ok(msg_bytes.clone()))
} else {
None
}
})
.collect();
let c_messages = match c_messages {
Err(rep) => {
gum::debug!(target: LOG_TARGET, action = "ReportPeer");
network_service.report_peer(remote, rep);
continue
},
Ok(v) => v,
};
if v_messages.is_empty() && c_messages.is_empty() {
continue
}
gum::trace!(
target: LOG_TARGET,
action = "PeerMessages",
peer = ?remote,
num_validation_messages = %v_messages.len(),
num_collation_messages = %c_messages.len()
);
if !v_messages.is_empty() {
let (events, reports) =
if expected_versions[PeerSet::Validation] == Some(1) {
handle_v1_peer_messages::<protocol_v1::ValidationProtocol, _>(
remote.clone(),
PeerSet::Validation,
&mut shared.0.lock().validation_peers,
v_messages,
&metrics,
)
} else {
gum::warn!(
target: LOG_TARGET,
version = ?expected_versions[PeerSet::Validation],
"Major logic bug. Peer somehow has unsupported validation protocol version."
);
never!("Only version 1 is supported; peer set connection checked above; qed");
// If a peer somehow triggers this, we'll disconnect them
// eventually.
(Vec::new(), vec![UNCONNECTED_PEERSET_COST])
};
for report in reports {
network_service.report_peer(remote.clone(), report);
}
dispatch_validation_events_to_all(events, &mut sender).await;
}
if !c_messages.is_empty() {
let (events, reports) =
if expected_versions[PeerSet::Collation] == Some(1) {
handle_v1_peer_messages::<protocol_v1::CollationProtocol, _>(
remote.clone(),
PeerSet::Collation,
&mut shared.0.lock().collation_peers,
c_messages,
&metrics,
)
} else {
gum::warn!(
target: LOG_TARGET,
version = ?expected_versions[PeerSet::Collation],
"Major logic bug. Peer somehow has unsupported collation protocol version."
);
never!("Only version 1 is supported; peer set connection checked above; qed");
// If a peer somehow triggers this, we'll disconnect them
// eventually.
(Vec::new(), vec![UNCONNECTED_PEERSET_COST])
};
for report in reports {
network_service.report_peer(remote.clone(), report);
}
dispatch_collation_events_to_all(events, &mut sender).await;
}
},
}
}
}
#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
async fn run_incoming_orchestra_signals<Context, N, AD>(
mut ctx: Context,
mut network_service: N,
mut authority_discovery_service: AD,
shared: Shared,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
) -> Result<(), Error>
where
N: Network,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
// This is kept sorted, descending, by block number.
let mut live_heads: Vec<ActivatedLeaf> = Vec::with_capacity(MAX_VIEW_HEADS);
let mut finalized_number = 0;
let mut mode = Mode::Syncing(sync_oracle);
loop {
match ctx.recv().fuse().await? {
FromOrchestra::Communication {
msg:
NetworkBridgeRxMessage::NewGossipTopology {
session,
our_neighbors_x,
our_neighbors_y,
},
} => {
gum::debug!(
target: LOG_TARGET,
action = "NewGossipTopology",
neighbors_x = our_neighbors_x.len(),
neighbors_y = our_neighbors_y.len(),
"Gossip topology has changed",
);
let gossip_peers_x =
update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_x).await;
let gossip_peers_y =
update_gossip_peers_1d(&mut authority_discovery_service, our_neighbors_y).await;
dispatch_validation_event_to_all_unbounded(
NetworkBridgeEvent::NewGossipTopology(NewGossipTopology {
session,
our_neighbors_x: gossip_peers_x,
our_neighbors_y: gossip_peers_y,
}),
ctx.sender(),
);
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(active_leaves)) => {
let ActiveLeavesUpdate { activated, deactivated } = active_leaves;
gum::trace!(
target: LOG_TARGET,
action = "ActiveLeaves",
has_activated = activated.is_some(),
num_deactivated = %deactivated.len(),
);
for activated in activated {
let pos = live_heads
.binary_search_by(|probe| probe.number.cmp(&activated.number).reverse())
.unwrap_or_else(|i| i);
live_heads.insert(pos, activated);
}
live_heads.retain(|h| !deactivated.contains(&h.hash));
// if we're done syncing, set the mode to `Mode::Active`.
// Otherwise, we don't need to send view updates.
{
let is_done_syncing = match mode {
Mode::Active => true,
Mode::Syncing(ref mut sync_oracle) => !sync_oracle.is_major_syncing(),
};
if is_done_syncing {
mode = Mode::Active;
update_our_view(
&mut network_service,
&mut ctx,
&live_heads,
&shared,
finalized_number,
&metrics,
);
}
}
},
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, action = "BlockFinalized");
debug_assert!(finalized_number < number);
// we don't send the view updates here, but delay them until the next `ActiveLeaves`
// otherwise it might break assumptions of some of the subsystems
// that we never send the same `ActiveLeavesUpdate`
finalized_number = number;
},
}
}
}
/// Main driver, processing network events and overseer signals.
///
/// THIS IS A HACK. We need to ensure we never hold the mutex across an `.await` boundary
/// and `parking_lot` currently does not provide `Send`, which helps us enforce that.
/// If this breaks, we need to find another way to protect ourselves.
///
/// ```compile_fail
/// #use parking_lot::MutexGuard;
/// #fn is_send<T: Send>();
/// #is_send::<parking_lot::MutexGuard<'static, ()>();
/// ```
#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
async fn run_network_in<N, AD, Context>(
bridge: NetworkBridgeRx<N, AD>,
mut ctx: Context,
network_stream: BoxStream<'static, NetworkEvent>,
) -> Result<(), Error>
where
N: Network,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
let NetworkBridgeRx {
network_service,
authority_discovery_service,
metrics,
sync_oracle,
shared,
} = bridge;
let (task, network_event_handler) = handle_network_messages(
ctx.sender().clone(),
network_service.clone(),
network_stream,
authority_discovery_service.clone(),
metrics.clone(),
shared.clone(),
)
.remote_handle();
ctx.spawn("network-bridge-in-network-worker", Box::pin(task))?;
futures::pin_mut!(network_event_handler);
let orchestra_signal_handler = run_incoming_orchestra_signals(
ctx,
network_service,
authority_discovery_service,
shared,
sync_oracle,
metrics,
);
futures::pin_mut!(orchestra_signal_handler);
futures::future::select(orchestra_signal_handler, network_event_handler)
.await
.factor_first()
.0?;
Ok(())
}
fn construct_view(
live_heads: impl DoubleEndedIterator<Item = Hash>,
finalized_number: BlockNumber,
) -> View {
View::new(live_heads.take(MAX_VIEW_HEADS), finalized_number)
}
#[overseer::contextbounds(NetworkBridgeRx, prefix = self::overseer)]
fn update_our_view<Net, Context>(
net: &mut Net,
ctx: &mut Context,
live_heads: &[ActivatedLeaf],
shared: &Shared,
finalized_number: BlockNumber,
metrics: &Metrics,
) where
Net: Network,
{
let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
let (validation_peers, collation_peers) = {
let mut shared = shared.0.lock();
// We only want to send a view update when the heads changed.
// A change in finalized block number only is _not_ sufficient.
//
// If this is the first view update since becoming active, but our view is empty,
// there is no need to send anything.
match shared.local_view {
Some(ref v) if v.check_heads_eq(&new_view) => return,
None if live_heads.is_empty() => {
shared.local_view = Some(new_view);
return
},
_ => {
shared.local_view = Some(new_view.clone());
},
}
(
shared.validation_peers.keys().cloned().collect::<Vec<_>>(),
shared.collation_peers.keys().cloned().collect::<Vec<_>>(),
)
};
send_validation_message_v1(
net,
validation_peers,
WireMessage::ViewUpdate(new_view.clone()),
metrics,
);
send_collation_message_v1(net, collation_peers, WireMessage::ViewUpdate(new_view), metrics);
let our_view = OurView::new(
live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)),
finalized_number,
);
dispatch_validation_event_to_all_unbounded(
NetworkBridgeEvent::OurViewChange(our_view.clone()),
ctx.sender(),
);
dispatch_collation_event_to_all_unbounded(
NetworkBridgeEvent::OurViewChange(our_view),
ctx.sender(),
);
}
// Handle messages on a specific v1 peer-set. The peer is expected to be connected on that
// peer-set.
fn handle_v1_peer_messages<RawMessage: Decode, OutMessage: From<RawMessage>>(
peer: PeerId,
peer_set: PeerSet,
peers: &mut HashMap<PeerId, PeerData>,
messages: Vec<Bytes>,
metrics: &Metrics,
) -> (Vec<NetworkBridgeEvent<OutMessage>>, Vec<Rep>) {
let peer_data = match peers.get_mut(&peer) {
None => return (Vec::new(), vec![UNCONNECTED_PEERSET_COST]),
Some(d) => d,
};
let mut outgoing_events = Vec::with_capacity(messages.len());
let mut reports = Vec::new();
for message in messages {
metrics.on_notification_received(peer_set, peer_data.version, message.len());
let message = match WireMessage::<RawMessage>::decode_all(&mut message.as_ref()) {
Err(_) => {
reports.push(MALFORMED_MESSAGE_COST);
continue
},
Ok(m) => m,
};
outgoing_events.push(match message {
WireMessage::ViewUpdate(new_view) => {
if new_view.len() > MAX_VIEW_HEADS ||
new_view.finalized_number < peer_data.view.finalized_number
{
reports.push(MALFORMED_VIEW_COST);
continue
} else if new_view.is_empty() {
reports.push(EMPTY_VIEW_COST);
continue
} else if new_view == peer_data.view {
continue
} else {
peer_data.view = new_view;
NetworkBridgeEvent::PeerViewChange(peer.clone(), peer_data.view.clone())
}
},
WireMessage::ProtocolMessage(message) =>
NetworkBridgeEvent::PeerMessage(peer.clone(), message.into()),
})
}
(outgoing_events, reports)
}
fn send_validation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::ValidationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Validation, 1, message, metrics);
}
fn send_collation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::CollationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Collation, 1, message, metrics)
}
async fn dispatch_validation_event_to_all(
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
) {
dispatch_validation_events_to_all(std::iter::once(event), ctx).await
}
async fn dispatch_collation_event_to_all(
event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
) {
dispatch_collation_events_to_all(std::iter::once(event), ctx).await
}
fn dispatch_validation_event_to_all_unbounded(
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
) {
event
.focus()
.ok()
.map(StatementDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));
event
.focus()
.ok()
.map(BitfieldDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));
event
.focus()
.ok()
.map(ApprovalDistributionMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));
event
.focus()
.ok()
.map(GossipSupportMessage::from)
.and_then(|msg| Some(sender.send_unbounded_message(msg)));
}
fn dispatch_collation_event_to_all_unbounded(
event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
) {
if let Ok(msg) = event.focus() {
sender.send_unbounded_message(CollatorProtocolMessage::NetworkBridgeUpdate(msg))
}
}
async fn dispatch_validation_events_to_all<I>(
events: I,
sender: &mut impl overseer::NetworkBridgeRxSenderTrait,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>>,
I::IntoIter: Send,
{
for event in events {
sender
.send_messages(event.focus().map(StatementDistributionMessage::from))
.await;
sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await;
sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await;
sender.send_messages(event.focus().map(GossipSupportMessage::from)).await;
}
}
async fn dispatch_collation_events_to_all<I>(
events: I,
ctx: &mut impl overseer::NetworkBridgeRxSenderTrait,
) where
I: IntoIterator<Item = NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>>,
I::IntoIter: Send,
{
let messages_for = |event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>| {
event.focus().ok().map(|m| CollatorProtocolMessage::NetworkBridgeUpdate(m))
};
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}
@@ -16,6 +16,8 @@
use super::*;
use futures::{channel::oneshot, executor, stream::BoxStream};
use polkadot_node_network_protocol::{self as net_protocol, OurView};
use polkadot_node_subsystem::{messages::NetworkBridgeEvent, ActivatedLeaf};
use assert_matches::assert_matches;
use async_trait::async_trait;
@@ -44,7 +46,7 @@ use polkadot_node_subsystem_test_helpers::{
};
use polkadot_node_subsystem_util::metered;
use polkadot_primitives::v2::AuthorityDiscoveryId;
use polkadot_primitives_test_helpers::dummy_collator_signature;
use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring;
@@ -214,18 +216,18 @@ fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAc
#[derive(Clone)]
struct TestSyncOracle {
flag: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
done_syncing_sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
struct TestSyncOracleHandle {
done_syncing_receiver: oneshot::Receiver<()>,
flag: Arc<AtomicBool>,
is_major_syncing: Arc<AtomicBool>,
}
impl TestSyncOracleHandle {
fn set_done(&self) {
self.flag.store(false, Ordering::SeqCst);
self.is_major_syncing.store(false, Ordering::SeqCst);
}
async fn await_mode_switch(self) {
@@ -235,7 +237,7 @@ impl TestSyncOracleHandle {
impl SyncOracle for TestSyncOracle {
fn is_major_syncing(&mut self) -> bool {
let is_major_syncing = self.flag.load(Ordering::SeqCst);
let is_major_syncing = self.is_major_syncing.load(Ordering::SeqCst);
if !is_major_syncing {
if let Some(sender) = self.done_syncing_sender.lock().take() {
@@ -252,13 +254,16 @@ impl SyncOracle for TestSyncOracle {
}
// val - result of `is_major_syncing`.
fn make_sync_oracle(val: bool) -> (TestSyncOracle, TestSyncOracleHandle) {
fn make_sync_oracle(is_major_syncing: bool) -> (TestSyncOracle, TestSyncOracleHandle) {
let (tx, rx) = oneshot::channel();
let flag = Arc::new(AtomicBool::new(val));
let is_major_syncing = Arc::new(AtomicBool::new(is_major_syncing));
(
TestSyncOracle { flag: flag.clone(), done_syncing_sender: Arc::new(Mutex::new(Some(tx))) },
TestSyncOracleHandle { flag, done_syncing_receiver: rx },
TestSyncOracle {
is_major_syncing: is_major_syncing.clone(),
done_syncing_sender: Arc::new(Mutex::new(Some(tx))),
},
TestSyncOracleHandle { is_major_syncing, done_syncing_receiver: rx },
)
}
@@ -267,7 +272,7 @@ fn done_syncing_oracle() -> Box<dyn SyncOracle + Send> {
Box::new(oracle)
}
type VirtualOverseer = TestSubsystemContextHandle<NetworkBridgeMessage>;
type VirtualOverseer = TestSubsystemContextHandle<NetworkBridgeRxMessage>;
struct TestHarness {
network_handle: TestNetworkHandle,
@@ -284,14 +289,15 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let network_stream = network.event_stream();
let bridge = NetworkBridge {
let bridge = NetworkBridgeRx {
network_service: network,
authority_discovery_service: discovery,
metrics: Metrics(None),
sync_oracle,
shared: Shared::default(),
};
let network_bridge = run_network(bridge, context, network_stream)
let network_bridge = run_network_in(bridge, context, network_stream)
.map_err(|_| panic!("subsystem execution failed"))
.map(|_| ());
@@ -311,7 +317,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
async fn assert_sends_validation_event_to_all(
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>,
virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeRxMessage>,
) {
// Ordering must be consistent across:
// `fn dispatch_validation_event_to_all_unbounded`
@@ -347,7 +353,7 @@ async fn assert_sends_validation_event_to_all(
async fn assert_sends_collation_event_to_all(
event: NetworkBridgeEvent<net_protocol::VersionedCollationProtocol>,
virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>,
virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeRxMessage>,
) {
assert_matches!(
virtual_overseer.recv().await,
@@ -485,6 +491,7 @@ fn do_not_send_view_update_until_synced() {
let peer_a = PeerId::random();
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);
network_handle
.connect_peer(peer_a.clone(), PeerSet::Validation, ObservedRole::Full)
@@ -1082,117 +1089,6 @@ fn view_finalized_number_can_not_go_down() {
});
}
#[test]
fn send_messages_to_peers() {
test_harness(done_syncing_oracle(), |test_harness| async move {
let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;
let peer = PeerId::random();
network_handle
.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full)
.await;
network_handle
.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full)
.await;
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None),
&mut virtual_overseer,
)
.await;
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
&mut virtual_overseer,
)
.await;
}
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, 1, None),
&mut virtual_overseer,
)
.await;
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerViewChange(peer.clone(), View::default()),
&mut virtual_overseer,
)
.await;
}
// consume peer view changes
{
let _peer_view_changes = network_handle.next_network_actions(2).await;
}
// send a validation protocol message.
{
let approval_distribution_message =
protocol_v1::ApprovalDistributionMessage::Approvals(Vec::new());
let message_v1 = protocol_v1::ValidationProtocol::ApprovalDistribution(
approval_distribution_message.clone(),
);
virtual_overseer
.send(FromOrchestra::Communication {
msg: NetworkBridgeMessage::SendValidationMessage(
vec![peer.clone()],
Versioned::V1(message_v1.clone()),
),
})
.await;
assert_eq!(
network_handle.next_network_action().await,
NetworkAction::WriteNotification(
peer.clone(),
PeerSet::Validation,
WireMessage::ProtocolMessage(message_v1).encode(),
)
);
}
// send a collation protocol message.
{
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into(),
0_u32.into(),
dummy_collator_signature(),
);
let message_v1 =
protocol_v1::CollationProtocol::CollatorProtocol(collator_protocol_message.clone());
virtual_overseer
.send(FromOrchestra::Communication {
msg: NetworkBridgeMessage::SendCollationMessage(
vec![peer.clone()],
Versioned::V1(message_v1.clone()),
),
})
.await;
assert_eq!(
network_handle.next_network_action().await,
NetworkAction::WriteNotification(
peer.clone(),
PeerSet::Collation,
WireMessage::ProtocolMessage(message_v1).encode(),
)
);
}
virtual_overseer
});
}
#[test]
fn our_view_updates_decreasing_order_and_limited_to_max() {
test_harness(done_syncing_oracle(), |test_harness| async move {
+301
View File
@@ -0,0 +1,301 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Network Bridge Subsystem - handles _outgoing_ messages, from subsystem to the network.
use super::*;
use polkadot_node_network_protocol::{peer_set::PeerSet, v1 as protocol_v1, PeerId, Versioned};
use polkadot_node_subsystem::{
errors::SubsystemError, messages::NetworkBridgeTxMessage, overseer, FromOrchestra,
OverseerSignal, SpawnedSubsystem,
};
/// Peer set info for network initialization.
///
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use crate::validator_discovery;
/// Actual interfacing to the network based on the `Network` trait.
///
/// Defines the `Network` trait with an implementation for an `Arc<NetworkService>`.
use crate::network::{send_message, Network};
use crate::metrics::Metrics;
#[cfg(test)]
mod tests;
// network bridge log target
const LOG_TARGET: &'static str = "parachain::network-bridge-tx";
/// The network bridge subsystem.
pub struct NetworkBridgeTx<N, AD> {
/// `Network` trait implementing type.
network_service: N,
authority_discovery_service: AD,
metrics: Metrics,
}
impl<N, AD> NetworkBridgeTx<N, AD> {
/// Create a new network bridge subsystem with underlying network service and authority discovery service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`peers_sets_info`](peers_sets_info).
pub fn new(network_service: N, authority_discovery_service: AD, metrics: Metrics) -> Self {
Self { network_service, authority_discovery_service, metrics }
}
}
#[overseer::subsystem(NetworkBridgeTx, error = SubsystemError, prefix = self::overseer)]
impl<Net, AD, Context> NetworkBridgeTx<Net, AD>
where
Net: Network + Sync,
AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = run_network_out(self, ctx)
.map_err(|e| SubsystemError::with_origin("network-bridge", e))
.boxed();
SpawnedSubsystem { name: "network-bridge-subsystem", future }
}
}
#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
async fn handle_subsystem_messages<Context, N, AD>(
mut ctx: Context,
mut network_service: N,
mut authority_discovery_service: AD,
metrics: Metrics,
) -> Result<(), Error>
where
N: Network,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
let mut validator_discovery = validator_discovery::Service::<N, AD>::new();
loop {
match ctx.recv().fuse().await? {
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Signal(_) => { /* handled by incoming */ },
FromOrchestra::Communication { msg } => {
(network_service, authority_discovery_service) =
handle_incoming_subsystem_communication(
&mut ctx,
network_service,
&mut validator_discovery,
authority_discovery_service.clone(),
msg,
&metrics,
)
.await;
},
}
}
}
#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
async fn handle_incoming_subsystem_communication<Context, N, AD>(
_ctx: &mut Context,
mut network_service: N,
validator_discovery: &mut validator_discovery::Service<N, AD>,
mut authority_discovery_service: AD,
msg: NetworkBridgeTxMessage,
metrics: &Metrics,
) -> (N, AD)
where
N: Network,
AD: validator_discovery::AuthorityDiscovery + Clone,
{
match msg {
NetworkBridgeTxMessage::ReportPeer(peer, rep) => {
if !rep.is_benefit() {
gum::debug!(target: LOG_TARGET, ?peer, ?rep, action = "ReportPeer");
}
metrics.on_report_event();
network_service.report_peer(peer, rep);
},
NetworkBridgeTxMessage::DisconnectPeer(peer, peer_set) => {
gum::trace!(
target: LOG_TARGET,
action = "DisconnectPeer",
?peer,
peer_set = ?peer_set,
);
network_service.disconnect_peer(peer, peer_set);
},
NetworkBridgeTxMessage::SendValidationMessage(peers, msg) => {
gum::trace!(
target: LOG_TARGET,
action = "SendValidationMessages",
num_messages = 1usize,
);
match msg {
Versioned::V1(msg) => send_validation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
},
NetworkBridgeTxMessage::SendValidationMessages(msgs) => {
gum::trace!(
target: LOG_TARGET,
action = "SendValidationMessages",
num_messages = %msgs.len(),
);
for (peers, msg) in msgs {
match msg {
Versioned::V1(msg) => send_validation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
}
},
NetworkBridgeTxMessage::SendCollationMessage(peers, msg) => {
gum::trace!(
target: LOG_TARGET,
action = "SendCollationMessages",
num_messages = 1usize,
);
match msg {
Versioned::V1(msg) => send_collation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
},
NetworkBridgeTxMessage::SendCollationMessages(msgs) => {
gum::trace!(
target: LOG_TARGET,
action = "SendCollationMessages",
num_messages = %msgs.len(),
);
for (peers, msg) in msgs {
match msg {
Versioned::V1(msg) => send_collation_message_v1(
&mut network_service,
peers,
WireMessage::ProtocolMessage(msg),
&metrics,
),
}
}
},
NetworkBridgeTxMessage::SendRequests(reqs, if_disconnected) => {
gum::trace!(
target: LOG_TARGET,
action = "SendRequests",
num_requests = %reqs.len(),
);
for req in reqs {
network_service
.start_request(&mut authority_discovery_service, req, if_disconnected)
.await;
}
},
NetworkBridgeTxMessage::ConnectToValidators { validator_ids, peer_set, failed } => {
gum::trace!(
target: LOG_TARGET,
action = "ConnectToValidators",
peer_set = ?peer_set,
ids = ?validator_ids,
"Received a validator connection request",
);
metrics.note_desired_peer_count(peer_set, validator_ids.len());
let (network_service, ads) = validator_discovery
.on_request(
validator_ids,
peer_set,
failed,
network_service,
authority_discovery_service,
)
.await;
return (network_service, ads)
},
NetworkBridgeTxMessage::ConnectToResolvedValidators { validator_addrs, peer_set } => {
gum::trace!(
target: LOG_TARGET,
action = "ConnectToPeers",
peer_set = ?peer_set,
?validator_addrs,
"Received a resolved validator connection request",
);
metrics.note_desired_peer_count(peer_set, validator_addrs.len());
let all_addrs = validator_addrs.into_iter().flatten().collect();
let network_service = validator_discovery
.on_resolved_request(all_addrs, peer_set, network_service)
.await;
return (network_service, authority_discovery_service)
},
}
(network_service, authority_discovery_service)
}
#[overseer::contextbounds(NetworkBridgeTx, prefix = self::overseer)]
async fn run_network_out<N, AD, Context>(
bridge: NetworkBridgeTx<N, AD>,
ctx: Context,
) -> Result<(), Error>
where
N: Network,
AD: validator_discovery::AuthorityDiscovery + Clone + Sync,
{
let NetworkBridgeTx { network_service, authority_discovery_service, metrics } = bridge;
handle_subsystem_messages(ctx, network_service, authority_discovery_service, metrics).await?;
Ok(())
}
fn send_validation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::ValidationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Validation, 1, message, metrics);
}
fn send_collation_message_v1(
net: &mut impl Network,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::CollationProtocol>,
metrics: &Metrics,
) {
send_message(net, peers, PeerSet::Collation, 1, message, metrics)
}
@@ -0,0 +1,298 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use futures::{executor, stream::BoxStream};
use polkadot_node_subsystem_util::TimeoutExt;
use async_trait::async_trait;
use parking_lot::Mutex;
use std::{borrow::Cow, collections::HashSet};
use sc_network::{Event as NetworkEvent, IfDisconnected};
use polkadot_node_network_protocol::{
request_response::outgoing::Requests, ObservedRole, Versioned,
};
use polkadot_node_subsystem::{FromOrchestra, OverseerSignal};
use polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle;
use polkadot_node_subsystem_util::metered;
use polkadot_primitives::v2::AuthorityDiscoveryId;
use polkadot_primitives_test_helpers::dummy_collator_signature;
use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring;
const TIMEOUT: std::time::Duration = polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle::<NetworkBridgeTxMessage>::TIMEOUT;
use crate::{network::Network, validator_discovery::AuthorityDiscovery, Rep};
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, Rep),
/// Disconnect a peer from the given peer-set.
DisconnectPeer(PeerId, PeerSet),
/// Write a notification to a given peer on the given peer-set.
WriteNotification(PeerId, PeerSet, Vec<u8>),
}
// The subsystem's view of the network - only supports a single call to `event_stream`.
#[derive(Clone)]
struct TestNetwork {
net_events: Arc<Mutex<Option<metered::MeteredReceiver<NetworkEvent>>>>,
action_tx: Arc<Mutex<metered::UnboundedMeteredSender<NetworkAction>>>,
}
#[derive(Clone, Debug)]
struct TestAuthorityDiscovery;
// The test's view of the network. This receives updates from the subsystem in the form
// of `NetworkAction`s.
struct TestNetworkHandle {
action_rx: metered::UnboundedMeteredReceiver<NetworkAction>,
net_tx: metered::MeteredSender<NetworkEvent>,
}
fn new_test_network() -> (TestNetwork, TestNetworkHandle, TestAuthorityDiscovery) {
let (net_tx, net_rx) = metered::channel(10);
let (action_tx, action_rx) = metered::unbounded();
(
TestNetwork {
net_events: Arc::new(Mutex::new(Some(net_rx))),
action_tx: Arc::new(Mutex::new(action_tx)),
},
TestNetworkHandle { action_rx, net_tx },
TestAuthorityDiscovery,
)
}
#[async_trait]
impl Network for TestNetwork {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
self.net_events
.lock()
.take()
.expect("Subsystem made more than one call to `event_stream`")
.boxed()
}
async fn set_reserved_peers(
&mut self,
_protocol: Cow<'static, str>,
_: HashSet<Multiaddr>,
) -> Result<(), String> {
Ok(())
}
async fn remove_from_peers_set(&mut self, _protocol: Cow<'static, str>, _: Vec<PeerId>) {}
async fn start_request<AD: AuthorityDiscovery>(
&self,
_: &mut AD,
_: Requests,
_: IfDisconnected,
) {
}
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
self.action_tx
.lock()
.unbounded_send(NetworkAction::ReputationChange(who, cost_benefit))
.unwrap();
}
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) {
self.action_tx
.lock()
.unbounded_send(NetworkAction::DisconnectPeer(who, peer_set))
.unwrap();
}
fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec<u8>) {
self.action_tx
.lock()
.unbounded_send(NetworkAction::WriteNotification(who, peer_set, message))
.unwrap();
}
}
#[async_trait]
impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
async fn get_addresses_by_authority_id(
&mut self,
_authority: AuthorityDiscoveryId,
) -> Option<HashSet<Multiaddr>> {
None
}
async fn get_authority_ids_by_peer_id(
&mut self,
_peer_id: PeerId,
) -> Option<HashSet<AuthorityDiscoveryId>> {
None
}
}
impl TestNetworkHandle {
// Get the next network action.
async fn next_network_action(&mut self) -> NetworkAction {
self.action_rx.next().await.expect("subsystem concluded early")
}
async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) {
self.send_network_event(NetworkEvent::NotificationStreamOpened {
remote: peer,
protocol: peer_set.into_default_protocol_name(),
negotiated_fallback: None,
role: role.into(),
})
.await;
}
async fn send_network_event(&mut self, event: NetworkEvent) {
self.net_tx.send(event).await.expect("subsystem concluded early");
}
}
type VirtualOverseer = TestSubsystemContextHandle<NetworkBridgeTxMessage>;
struct TestHarness {
network_handle: TestNetworkHandle,
virtual_overseer: VirtualOverseer,
}
fn test_harness<T: Future<Output = VirtualOverseer>>(test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::TaskExecutor::new();
let (network, network_handle, discovery) = new_test_network();
let (context, virtual_overseer) =
polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let bridge_out = NetworkBridgeTx::new(network, discovery, Metrics(None));
let network_bridge_out_fut = run_network_out(bridge_out, context)
.map_err(|e| panic!("bridge-out subsystem execution failed {:?}", e))
.map(|_| ());
let test_fut = test(TestHarness { network_handle, virtual_overseer });
futures::pin_mut!(test_fut);
futures::pin_mut!(network_bridge_out_fut);
let _ = executor::block_on(future::join(
async move {
let mut virtual_overseer = test_fut.await;
virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await;
},
network_bridge_out_fut,
));
}
#[test]
fn send_messages_to_peers() {
test_harness(|test_harness| async move {
let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;
let peer = PeerId::random();
network_handle
.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full)
.timeout(TIMEOUT)
.await
.expect("Timeout does not occur");
// the outgoing side does not consume network messages
// so the single item sink has to be free explicitly
network_handle
.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full)
.timeout(TIMEOUT)
.await
.expect("Timeout does not occur");
// send a validation protocol message.
{
let approval_distribution_message =
protocol_v1::ApprovalDistributionMessage::Approvals(Vec::new());
let message_v1 = protocol_v1::ValidationProtocol::ApprovalDistribution(
approval_distribution_message.clone(),
);
virtual_overseer
.send(FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendValidationMessage(
vec![peer.clone()],
Versioned::V1(message_v1.clone()),
),
})
.timeout(TIMEOUT)
.await
.expect("Timeout does not occur");
assert_eq!(
network_handle
.next_network_action()
.timeout(TIMEOUT)
.await
.expect("Timeout does not occur"),
NetworkAction::WriteNotification(
peer.clone(),
PeerSet::Validation,
WireMessage::ProtocolMessage(message_v1).encode(),
)
);
}
// send a collation protocol message.
{
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into(),
0_u32.into(),
dummy_collator_signature(),
);
let message_v1 =
protocol_v1::CollationProtocol::CollatorProtocol(collator_protocol_message.clone());
virtual_overseer
.send(FromOrchestra::Communication {
msg: NetworkBridgeTxMessage::SendCollationMessage(
vec![peer.clone()],
Versioned::V1(message_v1.clone()),
),
})
.await;
assert_eq!(
network_handle
.next_network_action()
.timeout(TIMEOUT)
.await
.expect("Timeout does not occur"),
NetworkAction::WriteNotification(
peer.clone(),
PeerSet::Collation,
WireMessage::ProtocolMessage(message_v1).encode(),
)
);
}
virtual_overseer
});
}