diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 87898b940d..8e8dd5147c 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -49,7 +49,6 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus}; pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority}; use std::collections::{HashMap, hash_map, HashSet}; -use std::iter::ExactSizeIterator; use std::sync::Arc; mod validator_discovery; @@ -413,7 +412,7 @@ where &shared, finalized_number, &metrics, - ).await?; + ); } } } @@ -443,7 +442,7 @@ where action = "ReportPeer" ); } - network_service.report_peer(peer, rep).await? + network_service.report_peer(peer, rep); } NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { tracing::trace!( @@ -452,7 +451,7 @@ where ?peer, peer_set = ?peer_set, ); - network_service.disconnect_peer(peer, peer_set).await?; + network_service.disconnect_peer(peer, peer_set); } NetworkBridgeMessage::SendValidationMessage(peers, msg) => { tracing::trace!( @@ -467,7 +466,7 @@ where PeerSet::Validation, WireMessage::ProtocolMessage(msg), &metrics, - ).await? + ); } NetworkBridgeMessage::SendValidationMessages(msgs) => { tracing::trace!( @@ -483,7 +482,7 @@ where PeerSet::Validation, WireMessage::ProtocolMessage(msg), &metrics, - ).await? + ); } } NetworkBridgeMessage::SendCollationMessage(peers, msg) => { @@ -499,7 +498,7 @@ where PeerSet::Collation, WireMessage::ProtocolMessage(msg), &metrics, - ).await? + ); } NetworkBridgeMessage::SendCollationMessages(msgs) => { tracing::trace!( @@ -515,7 +514,7 @@ where PeerSet::Collation, WireMessage::ProtocolMessage(msg), &metrics, - ).await? + ); } } NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => { @@ -595,15 +594,16 @@ where async fn handle_network_messages( mut sender: impl SubsystemSender, mut network_service: impl Network, - mut network_stream: BoxStream<'static, NetworkEvent>, + network_stream: BoxStream<'static, NetworkEvent>, mut authority_discovery_service: AD, mut request_multiplexer: RequestMultiplexer, metrics: Metrics, shared: Shared, ) -> Result<(), UnexpectedAbort> { + let mut network_stream = network_stream.fuse(); loop { futures::select! { - network_event = network_stream.next().fuse() => match network_event { + network_event = network_stream.next() => match network_event { None => return Err(UnexpectedAbort::EventStreamConcluded), Some(NetworkEvent::Dht(_)) | Some(NetworkEvent::SyncConnected { .. }) @@ -668,7 +668,7 @@ async fn handle_network_messages( local_view, ), &metrics, - ).await?; + ); } PeerSet::Collation => { dispatch_collation_events_to_all( @@ -690,7 +690,7 @@ async fn handle_network_messages( local_view, ), &metrics, - ).await?; + ); } } } @@ -754,7 +754,7 @@ async fn handle_network_messages( action = "ReportPeer" ); - network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?; + network_service.report_peer(remote, MALFORMED_MESSAGE_COST); continue; } Ok(v) => v, @@ -778,7 +778,7 @@ async fn handle_network_messages( action = "ReportPeer" ); - network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?; + network_service.report_peer(remote, MALFORMED_MESSAGE_COST); continue; } Ok(c_messages) => { @@ -803,7 +803,7 @@ async fn handle_network_messages( ); for report in reports { - network_service.report_peer(remote.clone(), report).await?; + network_service.report_peer(remote.clone(), report); } dispatch_validation_events_to_all(events, &mut sender).await; @@ -819,7 +819,7 @@ async fn handle_network_messages( ); for report in reports { - network_service.report_peer(remote.clone(), report).await?; + network_service.report_peer(remote.clone(), report); } @@ -833,10 +833,7 @@ async fn handle_network_messages( req_res_event = request_multiplexer.next().fuse() => match req_res_event { None => return Err(UnexpectedAbort::RequestStreamConcluded), Some(Err(err)) => { - sender.send_message(NetworkBridgeMessage::ReportPeer( - err.peer, - MALFORMED_MESSAGE_COST, - ).into()).await; + network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST); } Some(Ok(msg)) => { sender.send_message(msg).await; @@ -874,7 +871,7 @@ where authority_discovery_service, metrics, sync_oracle, - } = bridge; + } = bridge; let statement_receiver = request_multiplexer .get_statement_fetching() @@ -953,14 +950,14 @@ fn construct_view(live_heads: impl DoubleEndedIterator, finalized_n ) } -async fn update_our_view( +fn update_our_view( net: &mut impl Network, ctx: &mut impl SubsystemContext, live_heads: &[ActivatedLeaf], shared: &Shared, finalized_number: BlockNumber, metrics: &Metrics, -) -> SubsystemResult<()> { +) { let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); let (validation_peers, collation_peers) = { @@ -973,11 +970,11 @@ async fn update_our_view( // there is no need to send anything. match shared.local_view { Some(ref v) if v.check_heads_eq(&new_view) => { - return Ok(()) + return; } None if live_heads.is_empty() => { shared.local_view = Some(new_view); - return Ok(()) + return; } _ => { shared.local_view = Some(new_view.clone()); @@ -996,14 +993,14 @@ async fn update_our_view( validation_peers, WireMessage::ViewUpdate(new_view.clone()), metrics, - ).await?; + ); send_collation_message( net, collation_peers, WireMessage::ViewUpdate(new_view), metrics, - ).await?; + ); let our_view = OurView::new( live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)), @@ -1019,8 +1016,6 @@ async fn update_our_view( NetworkBridgeEvent::OurViewChange(our_view), ctx.sender(), ); - - Ok(()) } // Handle messages on a specific peer-set. The peer is expected to be connected on that @@ -1075,30 +1070,22 @@ fn handle_peer_messages( (outgoing_messages, reports) } -async fn send_validation_message( +fn send_validation_message( net: &mut impl Network, - peers: I, + peers: Vec, message: WireMessage, metrics: &Metrics, -) -> SubsystemResult<()> - where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, -{ - send_message(net, peers, PeerSet::Validation, message, metrics).await +) { + send_message(net, peers, PeerSet::Validation, message, metrics); } -async fn send_collation_message( +fn send_collation_message( net: &mut impl Network, - peers: I, + peers: Vec, message: WireMessage, metrics: &Metrics, -) -> SubsystemResult<()> - where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, -{ - send_message(net, peers, PeerSet::Collation, message, metrics).await +) { + send_message(net, peers, PeerSet::Collation, message, metrics) } diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index 40b2daa146..3ffdb4baa1 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -16,11 +16,9 @@ use std::borrow::Cow; use std::collections::HashSet; -use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; -use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::BoxStream; @@ -36,7 +34,6 @@ use polkadot_node_network_protocol::{ PeerId, UnifiedReputationChange as Rep, }; use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; -use polkadot_subsystem::{SubsystemError, SubsystemResult}; use crate::validator_discovery::AuthorityDiscovery; @@ -47,62 +44,32 @@ use super::LOG_TARGET; /// This function is only used internally by the network-bridge, which is responsible to only send /// messages that are compatible with the passed peer set, as that is currently not enforced by /// this function. These are messages of type `WireMessage` parameterized on the matching type. -pub(crate) async fn send_message( +pub(crate) fn send_message( net: &mut impl Network, - peers: I, + mut peers: Vec, peer_set: PeerSet, message: M, metrics: &super::Metrics, -) -> SubsystemResult<()> +) where M: Encode + Clone, - I: IntoIterator, - I::IntoIter: ExactSizeIterator, { - let mut message_producer = stream::iter({ - let peers = peers.into_iter(); - let n_peers = peers.len(); - let mut message = { - let encoded = message.encode(); - metrics.on_notification_sent(peer_set, encoded.len(), n_peers); + let message = { + let encoded = message.encode(); + metrics.on_notification_sent(peer_set, encoded.len(), peers.len()); + encoded + }; - Some(encoded) - }; - - peers.enumerate().map(move |(i, peer)| { - // optimization: avoid cloning the message for the last peer in the - // list. The message payload can be quite large. If the underlying - // network used `Bytes` this would not be necessary. - let message = if i == n_peers - 1 { - message - .take() - .expect("Only taken in last iteration of loop, never afterwards; qed") - } else { - message - .as_ref() - .expect("Only taken in last iteration of loop, we are not there yet; qed") - .clone() - }; - - Ok(NetworkAction::WriteNotification(peer, peer_set, message)) - }) + // optimization: avoid cloning the message for the last peer in the + // list. The message payload can be quite large. If the underlying + // network used `Bytes` this would not be necessary. + let last_peer = peers.pop(); + peers.into_iter().for_each(|peer| { + net.write_notification(peer, peer_set, message.clone()); }); - - net.action_sink().send_all(&mut message_producer).await -} - -/// An action to be carried out by the network. -/// -/// This type is used for implementing `Sink` in order to communicate asynchronously with the -/// underlying network implementation in the `Network` trait. -#[derive(Debug, PartialEq)] -pub enum NetworkAction { - /// Note a change in reputation for a peer. - ReputationChange(PeerId, Rep), - /// Disconnect a peer from the given peer-set. - DisconnectPeer(PeerId, PeerSet), - /// Write a notification to a given peer on the given peer-set. - WriteNotification(PeerId, PeerSet, Vec), + if let Some(peer) = last_peer { + net.write_notification(peer, peer_set, message); + } } /// An abstraction over networking for the purposes of this subsystem. @@ -117,14 +84,18 @@ pub trait Network: Clone + Send + 'static { /// Ask the network to keep a substream open with these nodes and not disconnect from them /// until removed from the protocol's peer set. /// Note that `out_peers` setting has no effect on this. - async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String>; - /// Cancels the effects of `add_to_peers_set`. - async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String>; + async fn add_to_peers_set( + &mut self, + protocol: Cow<'static, str>, + multiaddresses: HashSet, + ) -> Result<(), String>; - /// Get access to an underlying sink for all network actions. - fn action_sink<'a>( - &'a mut self, - ) -> Pin + Send + 'a>>; + /// Cancels the effects of `add_to_peers_set`. + async fn remove_from_peers_set( + &mut self, + protocol: Cow<'static, str>, + multiaddresses: HashSet, + ) -> Result<(), String>; /// Send a request to a remote peer. async fn start_request( @@ -135,47 +106,18 @@ pub trait Network: Clone + Send + 'static { ); /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. - fn report_peer( - &mut self, - who: PeerId, - cost_benefit: Rep, - ) -> BoxFuture> { - async move { - self.action_sink() - .send(NetworkAction::ReputationChange(who, cost_benefit)) - .await - } - .boxed() - } + fn report_peer(&self, who: PeerId, cost_benefit: Rep); /// Disconnect a given peer from the peer set specified without harming reputation. - fn disconnect_peer( - &mut self, - who: PeerId, - peer_set: PeerSet, - ) -> BoxFuture> { - async move { - self.action_sink() - .send(NetworkAction::DisconnectPeer(who, peer_set)) - .await - } - .boxed() - } + fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet); /// Write a notification to a peer on the given peer-set's protocol. fn write_notification( - &mut self, + &self, who: PeerId, peer_set: PeerSet, message: Vec, - ) -> BoxFuture> { - async move { - self.action_sink() - .send(NetworkAction::WriteNotification(who, peer_set, message)) - .await - } - .boxed() - } + ); } #[async_trait] @@ -184,56 +126,42 @@ impl Network for Arc> { NetworkService::event_stream(self, "polkadot-network-bridge").boxed() } - async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String> { + async fn add_to_peers_set( + &mut self, + protocol: Cow<'static, str>, + multiaddresses: HashSet, + ) -> Result<(), String> { sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses) } - async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String> { - sc_network::NetworkService::remove_peers_from_reserved_set(&**self, protocol.clone(), multiaddresses.clone())?; + async fn remove_from_peers_set( + &mut self, + protocol: Cow<'static, str>, + multiaddresses: HashSet, + ) -> Result<(), String> { + sc_network::NetworkService::remove_peers_from_reserved_set( + &**self, + protocol.clone(), + multiaddresses.clone(), + )?; sc_network::NetworkService::remove_from_peers_set(&**self, protocol, multiaddresses) } - fn action_sink<'a>( - &'a mut self, - ) -> Pin + Send + 'a>> { - use futures::task::{Context, Poll}; + fn report_peer(&self, who: PeerId, cost_benefit: Rep) { + sc_network::NetworkService::report_peer(&**self, who, cost_benefit.into_base_rep()); + } - // wrapper around a NetworkService to make it act like a sink. - struct ActionSink<'b>(&'b NetworkService); + fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) { + sc_network::NetworkService::disconnect_peer(&**self, who, peer_set.into_protocol_name()); + } - impl<'b> Sink for ActionSink<'b> { - type Error = SubsystemError; - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> { - match action { - NetworkAction::ReputationChange(peer, cost_benefit) => { - self.0.report_peer(peer, cost_benefit.into_base_rep()) - } - NetworkAction::DisconnectPeer(peer, peer_set) => self - .0 - .disconnect_peer(peer, peer_set.into_protocol_name()), - NetworkAction::WriteNotification(peer, peer_set, message) => self - .0 - .write_notification(peer, peer_set.into_protocol_name(), message), - } - - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - } - - Box::pin(ActionSink(&**self)) + fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec) { + sc_network::NetworkService::write_notification( + &**self, + who, + peer_set.into_protocol_name(), + message, + ); } async fn start_request( diff --git a/polkadot/node/network/bridge/src/tests.rs b/polkadot/node/network/bridge/src/tests.rs index 736fed83c5..8785238b35 100644 --- a/polkadot/node/network/bridge/src/tests.rs +++ b/polkadot/node/network/bridge/src/tests.rs @@ -21,7 +21,6 @@ use futures::channel::oneshot; use std::borrow::Cow; use std::collections::HashSet; -use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use async_trait::async_trait; use parking_lot::Mutex; @@ -46,14 +45,25 @@ use sp_keyring::Sr25519Keyring; use polkadot_primitives::v1::AuthorityDiscoveryId; use polkadot_node_network_protocol::{ObservedRole, request_response::request::Requests}; -use crate::network::{Network, NetworkAction}; +use crate::network::Network; use crate::validator_discovery::AuthorityDiscovery; +use crate::Rep; + +#[derive(Debug, PartialEq)] +pub enum NetworkAction { + /// Note a change in reputation for a peer. + ReputationChange(PeerId, Rep), + /// Disconnect a peer from the given peer-set. + DisconnectPeer(PeerId, PeerSet), + /// Write a notification to a given peer on the given peer-set. + WriteNotification(PeerId, PeerSet, Vec), +} // The subsystem's view of the network - only supports a single call to `event_stream`. #[derive(Clone)] struct TestNetwork { net_events: Arc>>>, - action_tx: metered::UnboundedMeteredSender, + action_tx: Arc>>, _req_configs: Vec, } @@ -78,7 +88,7 @@ fn new_test_network(req_configs: Vec) -> ( ( TestNetwork { net_events: Arc::new(Mutex::new(Some(net_rx))), - action_tx, + action_tx: Arc::new(Mutex::new(action_tx)), _req_configs: req_configs, }, TestNetworkHandle { @@ -106,13 +116,30 @@ impl Network for TestNetwork { Ok(()) } - fn action_sink<'a>(&'a mut self) - -> Pin + Send + 'a>> - { - Box::pin((&mut self.action_tx).sink_map_err(Into::into)) + async fn start_request(&self, _: &mut AD, _: Requests, _: IfDisconnected) { } - async fn start_request(&self, _: &mut AD, _: Requests, _: IfDisconnected) { + fn report_peer(&self, who: PeerId, cost_benefit: Rep) { + self.action_tx.lock().unbounded_send( + NetworkAction::ReputationChange(who, cost_benefit) + ).unwrap(); + } + + fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) { + self.action_tx.lock().unbounded_send( + NetworkAction::DisconnectPeer(who, peer_set) + ).unwrap(); + } + + fn write_notification( + &self, + who: PeerId, + peer_set: PeerSet, + message: Vec, + ) { + self.action_tx.lock().unbounded_send( + NetworkAction::WriteNotification(who, peer_set, message) + ).unwrap(); } } diff --git a/polkadot/node/network/bridge/src/validator_discovery.rs b/polkadot/node/network/bridge/src/validator_discovery.rs index 1378a457b5..2debf12d8c 100644 --- a/polkadot/node/network/bridge/src/validator_discovery.rs +++ b/polkadot/node/network/bridge/src/validator_discovery.rs @@ -143,10 +143,10 @@ impl Service { #[cfg(test)] mod tests { use super::*; - use crate::network::{Network, NetworkAction}; + use crate::network::Network; - use std::{borrow::Cow, pin::Pin, collections::HashMap}; - use futures::{sink::Sink, stream::BoxStream}; + use std::{borrow::Cow, collections::HashMap}; + use futures::stream::BoxStream; use sc_network::{Event as NetworkEvent, IfDisconnected}; use sp_keyring::Sr25519Keyring; use polkadot_node_network_protocol::request_response::request::Requests; @@ -203,13 +203,24 @@ mod tests { Ok(()) } - fn action_sink<'a>(&'a mut self) - -> Pin + Send + 'a>> - { + async fn start_request(&self, _: &mut AD, _: Requests, _: IfDisconnected) { + } + + fn report_peer(&self, _: PeerId, _: crate::Rep) { panic!() } - async fn start_request(&self, _: &mut AD, _: Requests, _: IfDisconnected) { + fn disconnect_peer(&self, _: PeerId, _: PeerSet) { + panic!() + } + + fn write_notification( + &self, + _: PeerId, + _: PeerSet, + _: Vec, + ) { + panic!() } }