// Copyright 2020-2021 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . use std::borrow::Cow; use std::collections::HashSet; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::BoxStream; use parity_scale_codec::Encode; use sc_network::Event as NetworkEvent; use sc_network::{IfDisconnected, NetworkService, OutboundFailure, RequestFailure}; use sc_network::{config::parse_addr, multiaddr::Multiaddr}; use polkadot_node_network_protocol::{ peer_set::PeerSet, request_response::{OutgoingRequest, Requests, Recipient}, PeerId, UnifiedReputationChange as Rep, }; use polkadot_primitives::v1::{Block, Hash}; use polkadot_subsystem::{SubsystemError, SubsystemResult}; use crate::validator_discovery::AuthorityDiscovery; use super::LOG_TARGET; /// Send a message to the network. /// /// This function is only used internally by the network-bridge, which is responsible to only send /// messages that are compatible with the passed peer set, as that is currently not enforced by /// this function. These are messages of type `WireMessage` parameterized on the matching type. pub(crate) async fn send_message( net: &mut impl Network, peers: I, peer_set: PeerSet, message: M, metrics: &super::Metrics, ) -> SubsystemResult<()> where M: Encode + Clone, I: IntoIterator, I::IntoIter: ExactSizeIterator, { let mut message_producer = stream::iter({ let peers = peers.into_iter(); let n_peers = peers.len(); let mut message = { let encoded = message.encode(); metrics.on_notification_sent(peer_set, encoded.len(), n_peers); Some(encoded) }; peers.enumerate().map(move |(i, peer)| { // optimization: avoid cloning the message for the last peer in the // list. The message payload can be quite large. If the underlying // network used `Bytes` this would not be necessary. let message = if i == n_peers - 1 { message .take() .expect("Only taken in last iteration of loop, never afterwards; qed") } else { message .as_ref() .expect("Only taken in last iteration of loop, we are not there yet; qed") .clone() }; Ok(NetworkAction::WriteNotification(peer, peer_set, message)) }) }); net.action_sink().send_all(&mut message_producer).await } /// An action to be carried out by the network. /// /// This type is used for implementing `Sink` in order to communicate asynchronously with the /// underlying network implementation in the `Network` trait. #[derive(Debug, PartialEq)] pub enum NetworkAction { /// Note a change in reputation for a peer. ReputationChange(PeerId, Rep), /// Disconnect a peer from the given peer-set. DisconnectPeer(PeerId, PeerSet), /// Write a notification to a given peer on the given peer-set. WriteNotification(PeerId, PeerSet, Vec), } /// An abstraction over networking for the purposes of this subsystem. #[async_trait] pub trait Network: Clone + Send + 'static { /// Get a stream of all events occurring on the network. This may include events unrelated /// to the Polkadot protocol - the user of this function should filter only for events related /// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME) /// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME) fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; /// Ask the network to keep a substream open with these nodes and not disconnect from them /// until removed from the protocol's peer set. /// Note that `out_peers` setting has no effect on this. async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String>; /// Cancels the effects of `add_to_peers_set`. async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String>; /// Get access to an underlying sink for all network actions. fn action_sink<'a>( &'a mut self, ) -> Pin + Send + 'a>>; /// Send a request to a remote peer. async fn start_request( &self, authority_discovery: &mut AD, req: Requests, if_disconnected: IfDisconnected, ); /// Report a given peer as either beneficial (+) or costly (-) according to the given scalar. fn report_peer( &mut self, who: PeerId, cost_benefit: Rep, ) -> BoxFuture> { async move { self.action_sink() .send(NetworkAction::ReputationChange(who, cost_benefit)) .await } .boxed() } /// Disconnect a given peer from the peer set specified without harming reputation. fn disconnect_peer( &mut self, who: PeerId, peer_set: PeerSet, ) -> BoxFuture> { async move { self.action_sink() .send(NetworkAction::DisconnectPeer(who, peer_set)) .await } .boxed() } /// Write a notification to a peer on the given peer-set's protocol. fn write_notification( &mut self, who: PeerId, peer_set: PeerSet, message: Vec, ) -> BoxFuture> { async move { self.action_sink() .send(NetworkAction::WriteNotification(who, peer_set, message)) .await } .boxed() } } #[async_trait] impl Network for Arc> { fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { NetworkService::event_stream(self, "polkadot-network-bridge").boxed() } async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String> { sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses) } async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet) -> Result<(), String> { sc_network::NetworkService::remove_from_peers_set(&**self, protocol, multiaddresses) } fn action_sink<'a>( &'a mut self, ) -> Pin + Send + 'a>> { use futures::task::{Context, Poll}; // wrapper around a NetworkService to make it act like a sink. struct ActionSink<'b>(&'b NetworkService); impl<'b> Sink for ActionSink<'b> { type Error = SubsystemError; fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> { match action { NetworkAction::ReputationChange(peer, cost_benefit) => { self.0.report_peer(peer, cost_benefit.into_base_rep()) } NetworkAction::DisconnectPeer(peer, peer_set) => self .0 .disconnect_peer(peer, peer_set.into_protocol_name()), NetworkAction::WriteNotification(peer, peer_set, message) => self .0 .write_notification(peer, peer_set.into_protocol_name(), message), } Ok(()) } fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> { Poll::Ready(Ok(())) } fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { Poll::Ready(Ok(())) } } Box::pin(ActionSink(&**self)) } async fn start_request( &self, authority_discovery: &mut AD, req: Requests, if_disconnected: IfDisconnected, ) { let ( protocol, OutgoingRequest { peer, payload, pending_response, }, ) = req.encode_request(); let peer_id = match peer { Recipient::Peer(peer_id) => Some(peer_id), Recipient::Authority(authority) => { let mut found_peer_id = None; // Note: `get_addresses_by_authority_id` searched in a cache, and it thus expected // to be very quick. for addr in authority_discovery .get_addresses_by_authority_id(authority).await .into_iter().flat_map(|list| list.into_iter()) { let (peer_id, addr) = match parse_addr(addr) { Ok(v) => v, Err(_) => continue, }; NetworkService::add_known_address( &*self, peer_id.clone(), addr, ); found_peer_id = Some(peer_id); } found_peer_id } }; let peer_id = match peer_id { None => { tracing::debug!(target: LOG_TARGET, "Discovering authority failed"); match pending_response .send(Err(RequestFailure::Network(OutboundFailure::DialFailure))) { Err(_) => tracing::debug!( target: LOG_TARGET, "Sending failed request response failed." ), Ok(_) => {} } return; } Some(peer_id) => peer_id, }; NetworkService::start_request( &*self, peer_id, protocol.into_protocol_name(), payload, pending_response, if_disconnected, ); } }