diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 08d061ee26..7b334175a2 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -38,6 +38,7 @@ use libp2p::{ use log::debug; use prost::Message; use sc_consensus::import_queue::{IncomingBlock, Origin}; +use sc_peerset::PeersetHandle; use sp_consensus::BlockOrigin; use sp_runtime::{ traits::{Block as BlockT, NumberFor}, @@ -206,6 +207,7 @@ impl Behaviour { light_client_request_protocol_config: request_responses::ProtocolConfig, // All remaining request protocol configs. mut request_response_protocols: Vec, + peerset: PeersetHandle, ) -> Result { // Extract protocol name and add to `request_response_protocols`. let block_request_protocol_name = block_request_protocol_config.name.to_string(); @@ -229,6 +231,7 @@ impl Behaviour { bitswap: bitswap.into(), request_responses: request_responses::RequestResponsesBehaviour::new( request_response_protocols.into_iter(), + peerset, )?, light_client_request_sender, events: VecDeque::new(), diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 6ebc7416c2..0908d7510e 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -64,6 +64,7 @@ use std::{ }; pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; +use sc_peerset::{PeersetHandle, BANNED_THRESHOLD}; /// Configuration for a single request-response protocol. #[derive(Debug, Clone)] @@ -256,6 +257,27 @@ pub struct RequestResponsesBehaviour { /// Whenever a response is received on `pending_responses`, insert a channel to be notified /// when the request has been sent out. send_feedback: HashMap>, + + /// Primarily used to get a reputation of a node. + peerset: PeersetHandle, + + /// Pending message request, holds `MessageRequest` as a Future state to poll it + /// until we get a response from `Peerset` + message_request: Option, +} + +// This is a state of processing incoming request Message. +// The main reason of this struct is to hold `get_peer_reputation` as a Future state. +struct MessageRequest { + peer: PeerId, + request_id: RequestId, + request: Vec, + channel: ResponseChannel, ()>>, + protocol: String, + resp_builder: Option>, + // Once we get incoming request we save all params, create an async call to Peerset + // to get the reputation of the peer. + get_peer_reputation: Pin> + Send>>, } /// Generated by the response builder and waiting to be processed. @@ -270,7 +292,10 @@ struct RequestProcessingOutcome { impl RequestResponsesBehaviour { /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if /// the same protocol is passed twice. - pub fn new(list: impl Iterator) -> Result { + pub fn new( + list: impl Iterator, + peerset: PeersetHandle, + ) -> Result { let mut protocols = HashMap::new(); for protocol in list { let mut cfg = RequestResponseConfig::default(); @@ -304,6 +329,8 @@ impl RequestResponsesBehaviour { pending_responses: Default::default(), pending_responses_arrival_time: Default::default(), send_feedback: Default::default(), + peerset, + message_request: None, }) } @@ -492,6 +519,93 @@ impl NetworkBehaviour for RequestResponsesBehaviour { >, > { 'poll_all: loop { + if let Some(message_request) = self.message_request.take() { + // Now we can can poll `MessageRequest` until we get the reputation + + let MessageRequest { + peer, + request_id, + request, + channel, + protocol, + resp_builder, + mut get_peer_reputation, + } = message_request; + + let reputation = Future::poll(Pin::new(&mut get_peer_reputation), cx); + match reputation { + Poll::Pending => { + // Save the state to poll it again next time. + + self.message_request = Some(MessageRequest { + peer, + request_id, + request, + channel, + protocol, + resp_builder, + get_peer_reputation, + }); + return Poll::Pending + }, + Poll::Ready(reputation) => { + // Once we get the reputation we can continue processing the request. + + let reputation = reputation.expect( + "The channel can only be closed if the peerset no longer exists; qed", + ); + + if reputation < BANNED_THRESHOLD { + log::debug!( + target: "sub-libp2p", + "Cannot handle requests from a node with a low reputation {}: {}", + peer, + reputation, + ); + continue 'poll_all + } + + let (tx, rx) = oneshot::channel(); + + // Submit the request to the "response builder" passed by the user at + // initialization. + if let Some(mut resp_builder) = resp_builder { + // If the response builder is too busy, silently drop `tx`. This + // will be reported by the corresponding `RequestResponse` through + // an `InboundFailure::Omission` event. + let _ = resp_builder.try_send(IncomingRequest { + peer: peer.clone(), + payload: request, + pending_response: tx, + }); + } else { + debug_assert!(false, "Received message on outbound-only protocol."); + } + + let protocol = Cow::from(protocol); + self.pending_responses.push(Box::pin(async move { + // The `tx` created above can be dropped if we are not capable of + // processing this request, which is reflected as a + // `InboundFailure::Omission` event. + if let Ok(response) = rx.await { + Some(RequestProcessingOutcome { + peer, + request_id, + protocol, + inner_channel: channel, + response, + }) + } else { + None + } + })); + + // This `continue` makes sure that `pending_responses` gets polled + // after we have added the new element. + continue 'poll_all + }, + } + } // Poll to see if any response is ready to be sent back. while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { let RequestProcessingOutcome { @@ -585,42 +699,24 @@ impl NetworkBehaviour for RequestResponsesBehaviour { Instant::now(), ); - let (tx, rx) = oneshot::channel(); + let get_peer_reputation = + self.peerset.clone().peer_reputation(peer.clone()); + let get_peer_reputation = Box::pin(get_peer_reputation); - // Submit the request to the "response builder" passed by the user at - // initialization. - if let Some(resp_builder) = resp_builder { - // If the response builder is too busy, silently drop `tx`. This - // will be reported by the corresponding `RequestResponse` through - // an `InboundFailure::Omission` event. - let _ = resp_builder.try_send(IncomingRequest { - peer: peer.clone(), - payload: request, - pending_response: tx, - }); - } else { - debug_assert!(false, "Received message on outbound-only protocol."); - } + // Save the Future-like state with params to poll `get_peer_reputation` + // and to continue processing the request once we get the reputation of + // the peer. + self.message_request = Some(MessageRequest { + peer, + request_id, + request, + channel, + protocol: protocol.to_string(), + resp_builder: resp_builder.clone(), + get_peer_reputation, + }); - let protocol = protocol.clone(); - self.pending_responses.push(Box::pin(async move { - // The `tx` created above can be dropped if we are not capable of - // processing this request, which is reflected as a - // `InboundFailure::Omission` event. - if let Ok(response) = rx.await { - Some(RequestProcessingOutcome { - peer, - request_id, - protocol, - inner_channel: channel, - response, - }) - } else { - None - } - })); - - // This `continue` makes sure that `pending_responses` gets polled + // This `continue` makes sure that `message_request` gets polled // after we have added the new element. continue 'poll_all }, @@ -934,11 +1030,12 @@ mod tests { swarm::{Swarm, SwarmEvent}, Multiaddr, }; + use sc_peerset::{Peerset, PeersetConfig, SetConfig}; use std::{iter, time::Duration}; fn build_swarm( list: impl Iterator, - ) -> (Swarm, Multiaddr) { + ) -> (Swarm, Multiaddr, Peerset) { let keypair = Keypair::generate_ed25519(); let noise_keys = @@ -950,13 +1047,29 @@ mod tests { .multiplex(libp2p::yamux::YamuxConfig::default()) .boxed(); - let behaviour = RequestResponsesBehaviour::new(list).unwrap(); + let config = PeersetConfig { + sets: vec![SetConfig { + in_peers: u32::max_value(), + out_peers: u32::max_value(), + bootnodes: vec![], + reserved_nodes: Default::default(), + reserved_only: false, + }], + }; + + let (peerset, handle) = Peerset::from_config(config); + + let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap(); let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); swarm.listen_on(listen_addr.clone()).unwrap(); - (swarm, listen_addr) + (swarm, listen_addr, peerset) + } + + async fn loop_peerset(peerset: Peerset) { + let _: Vec<_> = peerset.collect().await; } #[test] @@ -1007,10 +1120,12 @@ mod tests { Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap(); } + let (mut swarm, _, peerset) = swarms.remove(0); + // Process every peerset event in the background. + pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); // Running `swarm[0]` in the background. pool.spawner() .spawn_obj({ - let (mut swarm, _) = swarms.remove(0); async move { loop { match swarm.select_next_some().await { @@ -1027,7 +1142,9 @@ mod tests { .unwrap(); // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); + let (mut swarm, _, peerset) = swarms.remove(0); + // Process every peerset event in the background. + pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); pool.run_until(async move { let mut response_receiver = None; @@ -1105,9 +1222,11 @@ mod tests { // Running `swarm[0]` in the background until a `InboundRequest` event happens, // which is a hint about the test having ended. + let (mut swarm, _, peerset) = swarms.remove(0); + // Process every peerset event in the background. + pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); pool.spawner() .spawn_obj({ - let (mut swarm, _) = swarms.remove(0); async move { loop { match swarm.select_next_some().await { @@ -1125,7 +1244,9 @@ mod tests { .unwrap(); // Remove and run the remaining swarm. - let (mut swarm, _) = swarms.remove(0); + let (mut swarm, _, peerset) = swarms.remove(0); + // Process every peerset event in the background. + pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); pool.run_until(async move { let mut response_receiver = None; @@ -1195,7 +1316,7 @@ mod tests { build_swarm(protocol_configs.into_iter()).0 }; - let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = { + let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2, peerset) = { let (tx_1, rx_1) = mpsc::channel(64); let (tx_2, rx_2) = mpsc::channel(64); @@ -1216,10 +1337,12 @@ mod tests { }, ]; - let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter()); + let (swarm, listen_addr, peerset) = build_swarm(protocol_configs.into_iter()); - (swarm, rx_1, rx_2, listen_addr) + (swarm, rx_1, rx_2, listen_addr, peerset) }; + // Process every peerset event in the background. + pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap(); // Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test, // so they wouldn't connect to each other. diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 525470145b..23f9c614d9 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -355,6 +355,7 @@ impl NetworkWorker { bitswap, params.light_client_request_protocol_config, params.network_config.request_response_protocols, + peerset_handle.clone(), ); match result { diff --git a/substrate/client/peerset/src/lib.rs b/substrate/client/peerset/src/lib.rs index 9c6c5617c3..0775354bef 100644 --- a/substrate/client/peerset/src/lib.rs +++ b/substrate/client/peerset/src/lib.rs @@ -34,7 +34,7 @@ mod peersstate; -use futures::prelude::*; +use futures::{channel::oneshot, prelude::*}; use log::{debug, error, trace}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use serde_json::json; @@ -49,7 +49,7 @@ use wasm_timer::Delay; pub use libp2p::PeerId; /// We don't accept nodes whose reputation is under this value. -const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100); +pub const BANNED_THRESHOLD: i32 = 82 * (i32::MIN / 100); /// Reputation change for a node when we get disconnected from it. const DISCONNECT_REPUTATION_CHANGE: i32 = -256; /// Amount of time between the moment we disconnect from a node and the moment we remove it from @@ -65,6 +65,7 @@ enum Action { ReportPeer(PeerId, ReputationChange), AddToPeersSet(SetId, PeerId), RemoveFromPeersSet(SetId, PeerId), + PeerReputation(PeerId, oneshot::Sender), } /// Identifier of a set in the peerset. @@ -165,6 +166,16 @@ impl PeersetHandle { pub fn remove_from_peers_set(&self, set_id: SetId, peer_id: PeerId) { let _ = self.tx.unbounded_send(Action::RemoveFromPeersSet(set_id, peer_id)); } + + /// Returns the reputation value of the peer. + pub async fn peer_reputation(self, peer_id: PeerId) -> Result { + let (tx, rx) = oneshot::channel(); + + let _ = self.tx.unbounded_send(Action::PeerReputation(peer_id, tx)); + + // The channel can only be closed if the peerset no longer exists. + rx.await.map_err(|_| ()) + } } /// Message that can be sent by the peer set manager (PSM). @@ -454,6 +465,11 @@ impl Peerset { } } + fn on_peer_reputation(&mut self, peer_id: PeerId, pending_response: oneshot::Sender) { + let reputation = self.data.peer_reputation(peer_id); + let _ = pending_response.send(reputation.reputation()); + } + /// Updates the value of `self.latest_time_update` and performs all the updates that happen /// over time, such as reputation increases for staying connected. fn update_time(&mut self) { @@ -744,6 +760,8 @@ impl Stream for Peerset { self.add_to_peers_set(sets_name, peer_id), Action::RemoveFromPeersSet(sets_name, peer_id) => self.on_remove_from_peers_set(sets_name, peer_id), + Action::PeerReputation(peer_id, pending_response) => + self.on_peer_reputation(peer_id, pending_response), } } }