Don't answer peers with a low reputation (#9008)

* Init architecture for not answering peers with a low reputation

* Get reputation inside of RequestResponsesBehaviour::poll

* Filter reputation in RequestResponsesBehaviour

* Pass PeersetHandle to RequestResponsesBehaviour

* Add more docs

* Fix tests compilation

* Fix compiler warnings (still FIXME)

* Fix tests

* Fmt code
This commit is contained in:
Roman
2021-09-21 14:32:13 +03:00
committed by GitHub
parent 0930948580
commit f6ab53f3c3
4 changed files with 192 additions and 47 deletions
@@ -38,6 +38,7 @@ use libp2p::{
use log::debug;
use prost::Message;
use sc_consensus::import_queue::{IncomingBlock, Origin};
use sc_peerset::PeersetHandle;
use sp_consensus::BlockOrigin;
use sp_runtime::{
traits::{Block as BlockT, NumberFor},
@@ -206,6 +207,7 @@ impl<B: BlockT> Behaviour<B> {
light_client_request_protocol_config: request_responses::ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec<request_responses::ProtocolConfig>,
peerset: PeersetHandle,
) -> Result<Self, request_responses::RegisterError> {
// Extract protocol name and add to `request_response_protocols`.
let block_request_protocol_name = block_request_protocol_config.name.to_string();
@@ -229,6 +231,7 @@ impl<B: BlockT> Behaviour<B> {
bitswap: bitswap.into(),
request_responses: request_responses::RequestResponsesBehaviour::new(
request_response_protocols.into_iter(),
peerset,
)?,
light_client_request_sender,
events: VecDeque::new(),
+168 -45
View File
@@ -64,6 +64,7 @@ use std::{
};
pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId};
use sc_peerset::{PeersetHandle, BANNED_THRESHOLD};
/// Configuration for a single request-response protocol.
#[derive(Debug, Clone)]
@@ -256,6 +257,27 @@ pub struct RequestResponsesBehaviour {
/// Whenever a response is received on `pending_responses`, insert a channel to be notified
/// when the request has been sent out.
send_feedback: HashMap<ProtocolRequestId, oneshot::Sender<()>>,
/// Primarily used to get a reputation of a node.
peerset: PeersetHandle,
/// Pending message request, holds `MessageRequest` as a Future state to poll it
/// until we get a response from `Peerset`
message_request: Option<MessageRequest>,
}
// This is a state of processing incoming request Message.
// The main reason of this struct is to hold `get_peer_reputation` as a Future state.
struct MessageRequest {
peer: PeerId,
request_id: RequestId,
request: Vec<u8>,
channel: ResponseChannel<Result<Vec<u8>, ()>>,
protocol: String,
resp_builder: Option<futures::channel::mpsc::Sender<IncomingRequest>>,
// Once we get incoming request we save all params, create an async call to Peerset
// to get the reputation of the peer.
get_peer_reputation: Pin<Box<dyn Future<Output = Result<i32, ()>> + Send>>,
}
/// Generated by the response builder and waiting to be processed.
@@ -270,7 +292,10 @@ struct RequestProcessingOutcome {
impl RequestResponsesBehaviour {
/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
/// the same protocol is passed twice.
pub fn new(list: impl Iterator<Item = ProtocolConfig>) -> Result<Self, RegisterError> {
pub fn new(
list: impl Iterator<Item = ProtocolConfig>,
peerset: PeersetHandle,
) -> Result<Self, RegisterError> {
let mut protocols = HashMap::new();
for protocol in list {
let mut cfg = RequestResponseConfig::default();
@@ -304,6 +329,8 @@ impl RequestResponsesBehaviour {
pending_responses: Default::default(),
pending_responses_arrival_time: Default::default(),
send_feedback: Default::default(),
peerset,
message_request: None,
})
}
@@ -492,6 +519,93 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
>,
> {
'poll_all: loop {
if let Some(message_request) = self.message_request.take() {
// Now we can can poll `MessageRequest` until we get the reputation
let MessageRequest {
peer,
request_id,
request,
channel,
protocol,
resp_builder,
mut get_peer_reputation,
} = message_request;
let reputation = Future::poll(Pin::new(&mut get_peer_reputation), cx);
match reputation {
Poll::Pending => {
// Save the state to poll it again next time.
self.message_request = Some(MessageRequest {
peer,
request_id,
request,
channel,
protocol,
resp_builder,
get_peer_reputation,
});
return Poll::Pending
},
Poll::Ready(reputation) => {
// Once we get the reputation we can continue processing the request.
let reputation = reputation.expect(
"The channel can only be closed if the peerset no longer exists; qed",
);
if reputation < BANNED_THRESHOLD {
log::debug!(
target: "sub-libp2p",
"Cannot handle requests from a node with a low reputation {}: {}",
peer,
reputation,
);
continue 'poll_all
}
let (tx, rx) = oneshot::channel();
// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(mut resp_builder) = resp_builder {
// If the response builder is too busy, silently drop `tx`. This
// will be reported by the corresponding `RequestResponse` through
// an `InboundFailure::Omission` event.
let _ = resp_builder.try_send(IncomingRequest {
peer: peer.clone(),
payload: request,
pending_response: tx,
});
} else {
debug_assert!(false, "Received message on outbound-only protocol.");
}
let protocol = Cow::from(protocol);
self.pending_responses.push(Box::pin(async move {
// The `tx` created above can be dropped if we are not capable of
// processing this request, which is reflected as a
// `InboundFailure::Omission` event.
if let Ok(response) = rx.await {
Some(RequestProcessingOutcome {
peer,
request_id,
protocol,
inner_channel: channel,
response,
})
} else {
None
}
}));
// This `continue` makes sure that `pending_responses` gets polled
// after we have added the new element.
continue 'poll_all
},
}
}
// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
let RequestProcessingOutcome {
@@ -585,42 +699,24 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
Instant::now(),
);
let (tx, rx) = oneshot::channel();
let get_peer_reputation =
self.peerset.clone().peer_reputation(peer.clone());
let get_peer_reputation = Box::pin(get_peer_reputation);
// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(resp_builder) = resp_builder {
// If the response builder is too busy, silently drop `tx`. This
// will be reported by the corresponding `RequestResponse` through
// an `InboundFailure::Omission` event.
let _ = resp_builder.try_send(IncomingRequest {
peer: peer.clone(),
payload: request,
pending_response: tx,
});
} else {
debug_assert!(false, "Received message on outbound-only protocol.");
}
// Save the Future-like state with params to poll `get_peer_reputation`
// and to continue processing the request once we get the reputation of
// the peer.
self.message_request = Some(MessageRequest {
peer,
request_id,
request,
channel,
protocol: protocol.to_string(),
resp_builder: resp_builder.clone(),
get_peer_reputation,
});
let protocol = protocol.clone();
self.pending_responses.push(Box::pin(async move {
// The `tx` created above can be dropped if we are not capable of
// processing this request, which is reflected as a
// `InboundFailure::Omission` event.
if let Ok(response) = rx.await {
Some(RequestProcessingOutcome {
peer,
request_id,
protocol,
inner_channel: channel,
response,
})
} else {
None
}
}));
// This `continue` makes sure that `pending_responses` gets polled
// This `continue` makes sure that `message_request` gets polled
// after we have added the new element.
continue 'poll_all
},
@@ -934,11 +1030,12 @@ mod tests {
swarm::{Swarm, SwarmEvent},
Multiaddr,
};
use sc_peerset::{Peerset, PeersetConfig, SetConfig};
use std::{iter, time::Duration};
fn build_swarm(
list: impl Iterator<Item = ProtocolConfig>,
) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
) -> (Swarm<RequestResponsesBehaviour>, Multiaddr, Peerset) {
let keypair = Keypair::generate_ed25519();
let noise_keys =
@@ -950,13 +1047,29 @@ mod tests {
.multiplex(libp2p::yamux::YamuxConfig::default())
.boxed();
let behaviour = RequestResponsesBehaviour::new(list).unwrap();
let config = PeersetConfig {
sets: vec![SetConfig {
in_peers: u32::max_value(),
out_peers: u32::max_value(),
bootnodes: vec![],
reserved_nodes: Default::default(),
reserved_only: false,
}],
};
let (peerset, handle) = Peerset::from_config(config);
let behaviour = RequestResponsesBehaviour::new(list, handle).unwrap();
let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id());
let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
swarm.listen_on(listen_addr.clone()).unwrap();
(swarm, listen_addr)
(swarm, listen_addr, peerset)
}
async fn loop_peerset(peerset: Peerset) {
let _: Vec<_> = peerset.collect().await;
}
#[test]
@@ -1007,10 +1120,12 @@ mod tests {
Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap();
}
let (mut swarm, _, peerset) = swarms.remove(0);
// Process every peerset event in the background.
pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap();
// Running `swarm[0]` in the background.
pool.spawner()
.spawn_obj({
let (mut swarm, _) = swarms.remove(0);
async move {
loop {
match swarm.select_next_some().await {
@@ -1027,7 +1142,9 @@ mod tests {
.unwrap();
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
let (mut swarm, _, peerset) = swarms.remove(0);
// Process every peerset event in the background.
pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap();
pool.run_until(async move {
let mut response_receiver = None;
@@ -1105,9 +1222,11 @@ mod tests {
// Running `swarm[0]` in the background until a `InboundRequest` event happens,
// which is a hint about the test having ended.
let (mut swarm, _, peerset) = swarms.remove(0);
// Process every peerset event in the background.
pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap();
pool.spawner()
.spawn_obj({
let (mut swarm, _) = swarms.remove(0);
async move {
loop {
match swarm.select_next_some().await {
@@ -1125,7 +1244,9 @@ mod tests {
.unwrap();
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
let (mut swarm, _, peerset) = swarms.remove(0);
// Process every peerset event in the background.
pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap();
pool.run_until(async move {
let mut response_receiver = None;
@@ -1195,7 +1316,7 @@ mod tests {
build_swarm(protocol_configs.into_iter()).0
};
let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2, peerset) = {
let (tx_1, rx_1) = mpsc::channel(64);
let (tx_2, rx_2) = mpsc::channel(64);
@@ -1216,10 +1337,12 @@ mod tests {
},
];
let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
let (swarm, listen_addr, peerset) = build_swarm(protocol_configs.into_iter());
(swarm, rx_1, rx_2, listen_addr)
(swarm, rx_1, rx_2, listen_addr, peerset)
};
// Process every peerset event in the background.
pool.spawner().spawn_obj(loop_peerset(peerset).boxed().into()).unwrap();
// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
// so they wouldn't connect to each other.
+1
View File
@@ -355,6 +355,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
bitswap,
params.light_client_request_protocol_config,
params.network_config.request_response_protocols,
peerset_handle.clone(),
);
match result {