mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 00:31:07 +00:00
network-bridge: remove action_sink abstraction (#3308)
* network-bridge: remove action_sink abstraction * another wtf * filter out event stream * Revert "filter out event stream" This reverts commit 63bd8f5de5b44d415dcb205e1b9fad8145200e06. * retain cleanup though
This commit is contained in:
@@ -49,7 +49,6 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
|
||||
|
||||
use std::collections::{HashMap, hash_map, HashSet};
|
||||
use std::iter::ExactSizeIterator;
|
||||
use std::sync::Arc;
|
||||
|
||||
mod validator_discovery;
|
||||
@@ -413,7 +412,7 @@ where
|
||||
&shared,
|
||||
finalized_number,
|
||||
&metrics,
|
||||
).await?;
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -443,7 +442,7 @@ where
|
||||
action = "ReportPeer"
|
||||
);
|
||||
}
|
||||
network_service.report_peer(peer, rep).await?
|
||||
network_service.report_peer(peer, rep);
|
||||
}
|
||||
NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => {
|
||||
tracing::trace!(
|
||||
@@ -452,7 +451,7 @@ where
|
||||
?peer,
|
||||
peer_set = ?peer_set,
|
||||
);
|
||||
network_service.disconnect_peer(peer, peer_set).await?;
|
||||
network_service.disconnect_peer(peer, peer_set);
|
||||
}
|
||||
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
|
||||
tracing::trace!(
|
||||
@@ -467,7 +466,7 @@ where
|
||||
PeerSet::Validation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
);
|
||||
}
|
||||
NetworkBridgeMessage::SendValidationMessages(msgs) => {
|
||||
tracing::trace!(
|
||||
@@ -483,7 +482,7 @@ where
|
||||
PeerSet::Validation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
);
|
||||
}
|
||||
}
|
||||
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
|
||||
@@ -499,7 +498,7 @@ where
|
||||
PeerSet::Collation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
);
|
||||
}
|
||||
NetworkBridgeMessage::SendCollationMessages(msgs) => {
|
||||
tracing::trace!(
|
||||
@@ -515,7 +514,7 @@ where
|
||||
PeerSet::Collation,
|
||||
WireMessage::ProtocolMessage(msg),
|
||||
&metrics,
|
||||
).await?
|
||||
);
|
||||
}
|
||||
}
|
||||
NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => {
|
||||
@@ -595,15 +594,16 @@ where
|
||||
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
mut sender: impl SubsystemSender,
|
||||
mut network_service: impl Network,
|
||||
mut network_stream: BoxStream<'static, NetworkEvent>,
|
||||
network_stream: BoxStream<'static, NetworkEvent>,
|
||||
mut authority_discovery_service: AD,
|
||||
mut request_multiplexer: RequestMultiplexer,
|
||||
metrics: Metrics,
|
||||
shared: Shared,
|
||||
) -> Result<(), UnexpectedAbort> {
|
||||
let mut network_stream = network_stream.fuse();
|
||||
loop {
|
||||
futures::select! {
|
||||
network_event = network_stream.next().fuse() => match network_event {
|
||||
network_event = network_stream.next() => match network_event {
|
||||
None => return Err(UnexpectedAbort::EventStreamConcluded),
|
||||
Some(NetworkEvent::Dht(_))
|
||||
| Some(NetworkEvent::SyncConnected { .. })
|
||||
@@ -668,7 +668,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
local_view,
|
||||
),
|
||||
&metrics,
|
||||
).await?;
|
||||
);
|
||||
}
|
||||
PeerSet::Collation => {
|
||||
dispatch_collation_events_to_all(
|
||||
@@ -690,7 +690,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
local_view,
|
||||
),
|
||||
&metrics,
|
||||
).await?;
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -754,7 +754,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
action = "ReportPeer"
|
||||
);
|
||||
|
||||
network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?;
|
||||
network_service.report_peer(remote, MALFORMED_MESSAGE_COST);
|
||||
continue;
|
||||
}
|
||||
Ok(v) => v,
|
||||
@@ -778,7 +778,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
action = "ReportPeer"
|
||||
);
|
||||
|
||||
network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?;
|
||||
network_service.report_peer(remote, MALFORMED_MESSAGE_COST);
|
||||
continue;
|
||||
}
|
||||
Ok(c_messages) => {
|
||||
@@ -803,7 +803,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
);
|
||||
|
||||
for report in reports {
|
||||
network_service.report_peer(remote.clone(), report).await?;
|
||||
network_service.report_peer(remote.clone(), report);
|
||||
}
|
||||
|
||||
dispatch_validation_events_to_all(events, &mut sender).await;
|
||||
@@ -819,7 +819,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
);
|
||||
|
||||
for report in reports {
|
||||
network_service.report_peer(remote.clone(), report).await?;
|
||||
network_service.report_peer(remote.clone(), report);
|
||||
}
|
||||
|
||||
|
||||
@@ -833,10 +833,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
|
||||
req_res_event = request_multiplexer.next().fuse() => match req_res_event {
|
||||
None => return Err(UnexpectedAbort::RequestStreamConcluded),
|
||||
Some(Err(err)) => {
|
||||
sender.send_message(NetworkBridgeMessage::ReportPeer(
|
||||
err.peer,
|
||||
MALFORMED_MESSAGE_COST,
|
||||
).into()).await;
|
||||
network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST);
|
||||
}
|
||||
Some(Ok(msg)) => {
|
||||
sender.send_message(msg).await;
|
||||
@@ -874,7 +871,7 @@ where
|
||||
authority_discovery_service,
|
||||
metrics,
|
||||
sync_oracle,
|
||||
} = bridge;
|
||||
} = bridge;
|
||||
|
||||
let statement_receiver = request_multiplexer
|
||||
.get_statement_fetching()
|
||||
@@ -953,14 +950,14 @@ fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_n
|
||||
)
|
||||
}
|
||||
|
||||
async fn update_our_view(
|
||||
fn update_our_view(
|
||||
net: &mut impl Network,
|
||||
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
|
||||
live_heads: &[ActivatedLeaf],
|
||||
shared: &Shared,
|
||||
finalized_number: BlockNumber,
|
||||
metrics: &Metrics,
|
||||
) -> SubsystemResult<()> {
|
||||
) {
|
||||
let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
|
||||
|
||||
let (validation_peers, collation_peers) = {
|
||||
@@ -973,11 +970,11 @@ async fn update_our_view(
|
||||
// there is no need to send anything.
|
||||
match shared.local_view {
|
||||
Some(ref v) if v.check_heads_eq(&new_view) => {
|
||||
return Ok(())
|
||||
return;
|
||||
}
|
||||
None if live_heads.is_empty() => {
|
||||
shared.local_view = Some(new_view);
|
||||
return Ok(())
|
||||
return;
|
||||
}
|
||||
_ => {
|
||||
shared.local_view = Some(new_view.clone());
|
||||
@@ -996,14 +993,14 @@ async fn update_our_view(
|
||||
validation_peers,
|
||||
WireMessage::ViewUpdate(new_view.clone()),
|
||||
metrics,
|
||||
).await?;
|
||||
);
|
||||
|
||||
send_collation_message(
|
||||
net,
|
||||
collation_peers,
|
||||
WireMessage::ViewUpdate(new_view),
|
||||
metrics,
|
||||
).await?;
|
||||
);
|
||||
|
||||
let our_view = OurView::new(
|
||||
live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)),
|
||||
@@ -1019,8 +1016,6 @@ async fn update_our_view(
|
||||
NetworkBridgeEvent::OurViewChange(our_view),
|
||||
ctx.sender(),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Handle messages on a specific peer-set. The peer is expected to be connected on that
|
||||
@@ -1075,30 +1070,22 @@ fn handle_peer_messages<M>(
|
||||
(outgoing_messages, reports)
|
||||
}
|
||||
|
||||
async fn send_validation_message<I>(
|
||||
fn send_validation_message(
|
||||
net: &mut impl Network,
|
||||
peers: I,
|
||||
peers: Vec<PeerId>,
|
||||
message: WireMessage<protocol_v1::ValidationProtocol>,
|
||||
metrics: &Metrics,
|
||||
) -> SubsystemResult<()>
|
||||
where
|
||||
I: IntoIterator<Item=PeerId>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
send_message(net, peers, PeerSet::Validation, message, metrics).await
|
||||
) {
|
||||
send_message(net, peers, PeerSet::Validation, message, metrics);
|
||||
}
|
||||
|
||||
async fn send_collation_message<I>(
|
||||
fn send_collation_message(
|
||||
net: &mut impl Network,
|
||||
peers: I,
|
||||
peers: Vec<PeerId>,
|
||||
message: WireMessage<protocol_v1::CollationProtocol>,
|
||||
metrics: &Metrics,
|
||||
) -> SubsystemResult<()>
|
||||
where
|
||||
I: IntoIterator<Item=PeerId>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
send_message(net, peers, PeerSet::Collation, message, metrics).await
|
||||
) {
|
||||
send_message(net, peers, PeerSet::Collation, message, metrics)
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -16,11 +16,9 @@
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashSet;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::future::BoxFuture;
|
||||
use futures::prelude::*;
|
||||
use futures::stream::BoxStream;
|
||||
|
||||
@@ -36,7 +34,6 @@ use polkadot_node_network_protocol::{
|
||||
PeerId, UnifiedReputationChange as Rep,
|
||||
};
|
||||
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
|
||||
use polkadot_subsystem::{SubsystemError, SubsystemResult};
|
||||
|
||||
use crate::validator_discovery::AuthorityDiscovery;
|
||||
|
||||
@@ -47,62 +44,32 @@ use super::LOG_TARGET;
|
||||
/// This function is only used internally by the network-bridge, which is responsible to only send
|
||||
/// messages that are compatible with the passed peer set, as that is currently not enforced by
|
||||
/// this function. These are messages of type `WireMessage` parameterized on the matching type.
|
||||
pub(crate) async fn send_message<M, I>(
|
||||
pub(crate) fn send_message<M>(
|
||||
net: &mut impl Network,
|
||||
peers: I,
|
||||
mut peers: Vec<PeerId>,
|
||||
peer_set: PeerSet,
|
||||
message: M,
|
||||
metrics: &super::Metrics,
|
||||
) -> SubsystemResult<()>
|
||||
)
|
||||
where
|
||||
M: Encode + Clone,
|
||||
I: IntoIterator<Item = PeerId>,
|
||||
I::IntoIter: ExactSizeIterator,
|
||||
{
|
||||
let mut message_producer = stream::iter({
|
||||
let peers = peers.into_iter();
|
||||
let n_peers = peers.len();
|
||||
let mut message = {
|
||||
let encoded = message.encode();
|
||||
metrics.on_notification_sent(peer_set, encoded.len(), n_peers);
|
||||
let message = {
|
||||
let encoded = message.encode();
|
||||
metrics.on_notification_sent(peer_set, encoded.len(), peers.len());
|
||||
encoded
|
||||
};
|
||||
|
||||
Some(encoded)
|
||||
};
|
||||
|
||||
peers.enumerate().map(move |(i, peer)| {
|
||||
// optimization: avoid cloning the message for the last peer in the
|
||||
// list. The message payload can be quite large. If the underlying
|
||||
// network used `Bytes` this would not be necessary.
|
||||
let message = if i == n_peers - 1 {
|
||||
message
|
||||
.take()
|
||||
.expect("Only taken in last iteration of loop, never afterwards; qed")
|
||||
} else {
|
||||
message
|
||||
.as_ref()
|
||||
.expect("Only taken in last iteration of loop, we are not there yet; qed")
|
||||
.clone()
|
||||
};
|
||||
|
||||
Ok(NetworkAction::WriteNotification(peer, peer_set, message))
|
||||
})
|
||||
// optimization: avoid cloning the message for the last peer in the
|
||||
// list. The message payload can be quite large. If the underlying
|
||||
// network used `Bytes` this would not be necessary.
|
||||
let last_peer = peers.pop();
|
||||
peers.into_iter().for_each(|peer| {
|
||||
net.write_notification(peer, peer_set, message.clone());
|
||||
});
|
||||
|
||||
net.action_sink().send_all(&mut message_producer).await
|
||||
}
|
||||
|
||||
/// An action to be carried out by the network.
|
||||
///
|
||||
/// This type is used for implementing `Sink` in order to communicate asynchronously with the
|
||||
/// underlying network implementation in the `Network` trait.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum NetworkAction {
|
||||
/// Note a change in reputation for a peer.
|
||||
ReputationChange(PeerId, Rep),
|
||||
/// Disconnect a peer from the given peer-set.
|
||||
DisconnectPeer(PeerId, PeerSet),
|
||||
/// Write a notification to a given peer on the given peer-set.
|
||||
WriteNotification(PeerId, PeerSet, Vec<u8>),
|
||||
if let Some(peer) = last_peer {
|
||||
net.write_notification(peer, peer_set, message);
|
||||
}
|
||||
}
|
||||
|
||||
/// An abstraction over networking for the purposes of this subsystem.
|
||||
@@ -117,14 +84,18 @@ pub trait Network: Clone + Send + 'static {
|
||||
/// Ask the network to keep a substream open with these nodes and not disconnect from them
|
||||
/// until removed from the protocol's peer set.
|
||||
/// Note that `out_peers` setting has no effect on this.
|
||||
async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
|
||||
/// Cancels the effects of `add_to_peers_set`.
|
||||
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
|
||||
async fn add_to_peers_set(
|
||||
&mut self,
|
||||
protocol: Cow<'static, str>,
|
||||
multiaddresses: HashSet<Multiaddr>,
|
||||
) -> Result<(), String>;
|
||||
|
||||
/// Get access to an underlying sink for all network actions.
|
||||
fn action_sink<'a>(
|
||||
&'a mut self,
|
||||
) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>;
|
||||
/// Cancels the effects of `add_to_peers_set`.
|
||||
async fn remove_from_peers_set(
|
||||
&mut self,
|
||||
protocol: Cow<'static, str>,
|
||||
multiaddresses: HashSet<Multiaddr>,
|
||||
) -> Result<(), String>;
|
||||
|
||||
/// Send a request to a remote peer.
|
||||
async fn start_request<AD: AuthorityDiscovery>(
|
||||
@@ -135,47 +106,18 @@ pub trait Network: Clone + Send + 'static {
|
||||
);
|
||||
|
||||
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
|
||||
fn report_peer(
|
||||
&mut self,
|
||||
who: PeerId,
|
||||
cost_benefit: Rep,
|
||||
) -> BoxFuture<SubsystemResult<()>> {
|
||||
async move {
|
||||
self.action_sink()
|
||||
.send(NetworkAction::ReputationChange(who, cost_benefit))
|
||||
.await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
fn report_peer(&self, who: PeerId, cost_benefit: Rep);
|
||||
|
||||
/// Disconnect a given peer from the peer set specified without harming reputation.
|
||||
fn disconnect_peer(
|
||||
&mut self,
|
||||
who: PeerId,
|
||||
peer_set: PeerSet,
|
||||
) -> BoxFuture<SubsystemResult<()>> {
|
||||
async move {
|
||||
self.action_sink()
|
||||
.send(NetworkAction::DisconnectPeer(who, peer_set))
|
||||
.await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet);
|
||||
|
||||
/// Write a notification to a peer on the given peer-set's protocol.
|
||||
fn write_notification(
|
||||
&mut self,
|
||||
&self,
|
||||
who: PeerId,
|
||||
peer_set: PeerSet,
|
||||
message: Vec<u8>,
|
||||
) -> BoxFuture<SubsystemResult<()>> {
|
||||
async move {
|
||||
self.action_sink()
|
||||
.send(NetworkAction::WriteNotification(who, peer_set, message))
|
||||
.await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -184,56 +126,42 @@ impl Network for Arc<NetworkService<Block, Hash>> {
|
||||
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
|
||||
}
|
||||
|
||||
async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
|
||||
async fn add_to_peers_set(
|
||||
&mut self,
|
||||
protocol: Cow<'static, str>,
|
||||
multiaddresses: HashSet<Multiaddr>,
|
||||
) -> Result<(), String> {
|
||||
sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
|
||||
}
|
||||
|
||||
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
|
||||
sc_network::NetworkService::remove_peers_from_reserved_set(&**self, protocol.clone(), multiaddresses.clone())?;
|
||||
async fn remove_from_peers_set(
|
||||
&mut self,
|
||||
protocol: Cow<'static, str>,
|
||||
multiaddresses: HashSet<Multiaddr>,
|
||||
) -> Result<(), String> {
|
||||
sc_network::NetworkService::remove_peers_from_reserved_set(
|
||||
&**self,
|
||||
protocol.clone(),
|
||||
multiaddresses.clone(),
|
||||
)?;
|
||||
sc_network::NetworkService::remove_from_peers_set(&**self, protocol, multiaddresses)
|
||||
}
|
||||
|
||||
fn action_sink<'a>(
|
||||
&'a mut self,
|
||||
) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>> {
|
||||
use futures::task::{Context, Poll};
|
||||
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
|
||||
sc_network::NetworkService::report_peer(&**self, who, cost_benefit.into_base_rep());
|
||||
}
|
||||
|
||||
// wrapper around a NetworkService to make it act like a sink.
|
||||
struct ActionSink<'b>(&'b NetworkService<Block, Hash>);
|
||||
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) {
|
||||
sc_network::NetworkService::disconnect_peer(&**self, who, peer_set.into_protocol_name());
|
||||
}
|
||||
|
||||
impl<'b> Sink<NetworkAction> for ActionSink<'b> {
|
||||
type Error = SubsystemError;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
|
||||
match action {
|
||||
NetworkAction::ReputationChange(peer, cost_benefit) => {
|
||||
self.0.report_peer(peer, cost_benefit.into_base_rep())
|
||||
}
|
||||
NetworkAction::DisconnectPeer(peer, peer_set) => self
|
||||
.0
|
||||
.disconnect_peer(peer, peer_set.into_protocol_name()),
|
||||
NetworkAction::WriteNotification(peer, peer_set, message) => self
|
||||
.0
|
||||
.write_notification(peer, peer_set.into_protocol_name(), message),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
Box::pin(ActionSink(&**self))
|
||||
fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec<u8>) {
|
||||
sc_network::NetworkService::write_notification(
|
||||
&**self,
|
||||
who,
|
||||
peer_set.into_protocol_name(),
|
||||
message,
|
||||
);
|
||||
}
|
||||
|
||||
async fn start_request<AD: AuthorityDiscovery>(
|
||||
|
||||
@@ -21,7 +21,6 @@ use futures::channel::oneshot;
|
||||
|
||||
use std::borrow::Cow;
|
||||
use std::collections::HashSet;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use async_trait::async_trait;
|
||||
use parking_lot::Mutex;
|
||||
@@ -46,14 +45,25 @@ use sp_keyring::Sr25519Keyring;
|
||||
use polkadot_primitives::v1::AuthorityDiscoveryId;
|
||||
use polkadot_node_network_protocol::{ObservedRole, request_response::request::Requests};
|
||||
|
||||
use crate::network::{Network, NetworkAction};
|
||||
use crate::network::Network;
|
||||
use crate::validator_discovery::AuthorityDiscovery;
|
||||
use crate::Rep;
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum NetworkAction {
|
||||
/// Note a change in reputation for a peer.
|
||||
ReputationChange(PeerId, Rep),
|
||||
/// Disconnect a peer from the given peer-set.
|
||||
DisconnectPeer(PeerId, PeerSet),
|
||||
/// Write a notification to a given peer on the given peer-set.
|
||||
WriteNotification(PeerId, PeerSet, Vec<u8>),
|
||||
}
|
||||
|
||||
// The subsystem's view of the network - only supports a single call to `event_stream`.
|
||||
#[derive(Clone)]
|
||||
struct TestNetwork {
|
||||
net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
|
||||
action_tx: metered::UnboundedMeteredSender<NetworkAction>,
|
||||
action_tx: Arc<Mutex<metered::UnboundedMeteredSender<NetworkAction>>>,
|
||||
_req_configs: Vec<RequestResponseConfig>,
|
||||
}
|
||||
|
||||
@@ -78,7 +88,7 @@ fn new_test_network(req_configs: Vec<RequestResponseConfig>) -> (
|
||||
(
|
||||
TestNetwork {
|
||||
net_events: Arc::new(Mutex::new(Some(net_rx))),
|
||||
action_tx,
|
||||
action_tx: Arc::new(Mutex::new(action_tx)),
|
||||
_req_configs: req_configs,
|
||||
},
|
||||
TestNetworkHandle {
|
||||
@@ -106,13 +116,30 @@ impl Network for TestNetwork {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn action_sink<'a>(&'a mut self)
|
||||
-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
|
||||
{
|
||||
Box::pin((&mut self.action_tx).sink_map_err(Into::into))
|
||||
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
|
||||
}
|
||||
|
||||
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
|
||||
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
|
||||
self.action_tx.lock().unbounded_send(
|
||||
NetworkAction::ReputationChange(who, cost_benefit)
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) {
|
||||
self.action_tx.lock().unbounded_send(
|
||||
NetworkAction::DisconnectPeer(who, peer_set)
|
||||
).unwrap();
|
||||
}
|
||||
|
||||
fn write_notification(
|
||||
&self,
|
||||
who: PeerId,
|
||||
peer_set: PeerSet,
|
||||
message: Vec<u8>,
|
||||
) {
|
||||
self.action_tx.lock().unbounded_send(
|
||||
NetworkAction::WriteNotification(who, peer_set, message)
|
||||
).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -143,10 +143,10 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::network::{Network, NetworkAction};
|
||||
use crate::network::Network;
|
||||
|
||||
use std::{borrow::Cow, pin::Pin, collections::HashMap};
|
||||
use futures::{sink::Sink, stream::BoxStream};
|
||||
use std::{borrow::Cow, collections::HashMap};
|
||||
use futures::stream::BoxStream;
|
||||
use sc_network::{Event as NetworkEvent, IfDisconnected};
|
||||
use sp_keyring::Sr25519Keyring;
|
||||
use polkadot_node_network_protocol::request_response::request::Requests;
|
||||
@@ -203,13 +203,24 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn action_sink<'a>(&'a mut self)
|
||||
-> Pin<Box<dyn Sink<NetworkAction, Error = polkadot_subsystem::SubsystemError> + Send + 'a>>
|
||||
{
|
||||
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
|
||||
}
|
||||
|
||||
fn report_peer(&self, _: PeerId, _: crate::Rep) {
|
||||
panic!()
|
||||
}
|
||||
|
||||
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
|
||||
fn disconnect_peer(&self, _: PeerId, _: PeerSet) {
|
||||
panic!()
|
||||
}
|
||||
|
||||
fn write_notification(
|
||||
&self,
|
||||
_: PeerId,
|
||||
_: PeerSet,
|
||||
_: Vec<u8>,
|
||||
) {
|
||||
panic!()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user