mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 14:11:09 +00:00
client/network: Report reputation changes via response (#7958)
* client/network: Report reputation changes via response When handling a request by a remote peer in a request response handler, one might want to in- or de-crease the reputation of the peer. E.g. one might want to decrease the reputation slightly for each request, given that it forces the local node to do work, or one might want to issue a larger reputation change due to a malformed request by the remote peer. Instead of having to pass a peerset handle to each request response handler, this commit suggests to allow handlers to isssue reputation changes via the provided `pending_response` `oneshot` channel. A reputation change issued by a request response handler via the `pending_response` channel is received by the `RequestResponsesBehaviour` which passes the reputation change up as an event to eventually be send to a peerset via a peerset handle. * client/network/req-resp: Use Vec::new instead of None::<Vec<_>> * client/network: Rename Response to OutgoingResponse Given that a request-response request is not called `Request` but `InomingRequest`, rename a request-response response to `OutgoingResponse`. * client/finality-grandpa-warp: Send empty rep change via response
This commit is contained in:
@@ -18,7 +18,7 @@
|
||||
//! [`crate::request_responses::RequestResponsesBehaviour`].
|
||||
|
||||
use codec::Decode;
|
||||
use sc_network::config::{ProtocolId, IncomingRequest, RequestResponseConfig};
|
||||
use sc_network::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig};
|
||||
use sc_client_api::Backend;
|
||||
use sp_runtime::traits::NumberFor;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
@@ -113,7 +113,7 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
|
||||
fn handle_request(
|
||||
&self,
|
||||
payload: Vec<u8>,
|
||||
pending_response: oneshot::Sender<Vec<u8>>
|
||||
pending_response: oneshot::Sender<OutgoingResponse>
|
||||
) -> Result<(), HandleRequestError>
|
||||
where NumberFor<TBlock>: sc_finality_grandpa::BlockNumberOps,
|
||||
{
|
||||
@@ -124,8 +124,10 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
|
||||
self.backend.blockchain(), request.begin, Some(WARP_SYNC_FRAGMENTS_LIMIT), Some(&mut cache)
|
||||
)?;
|
||||
|
||||
pending_response.send(response)
|
||||
.map_err(|_| HandleRequestError::SendResponse)
|
||||
pending_response.send(OutgoingResponse {
|
||||
result: Ok(response),
|
||||
reputation_changes: Vec::new(),
|
||||
}).map_err(|_| HandleRequestError::SendResponse)
|
||||
}
|
||||
|
||||
/// Run [`GrandpaWarpSyncRequestHandler`].
|
||||
|
||||
@@ -369,6 +369,11 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Even
|
||||
peer, protocol, duration, result,
|
||||
});
|
||||
},
|
||||
request_responses::Event::ReputationChanges { peer, changes } => {
|
||||
for change in changes {
|
||||
self.substrate.report_peer(peer, change);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use codec::{Encode, Decode};
|
||||
use crate::chain::Client;
|
||||
use crate::config::ProtocolId;
|
||||
use crate::protocol::{message::BlockAttributes};
|
||||
use crate::request_responses::{IncomingRequest, ProtocolConfig};
|
||||
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
|
||||
use crate::schema::v1::block_request::FromBlock;
|
||||
use crate::schema::v1::{BlockResponse, Direction};
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
@@ -85,7 +85,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
|
||||
fn handle_request(
|
||||
&self,
|
||||
payload: Vec<u8>,
|
||||
pending_response: oneshot::Sender<Vec<u8>>
|
||||
pending_response: oneshot::Sender<OutgoingResponse>
|
||||
) -> Result<(), HandleRequestError> {
|
||||
let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;
|
||||
|
||||
@@ -181,8 +181,10 @@ impl <B: BlockT> BlockRequestHandler<B> {
|
||||
let mut data = Vec::with_capacity(res.encoded_len());
|
||||
res.encode(&mut data)?;
|
||||
|
||||
pending_response.send(data)
|
||||
.map_err(|_| HandleRequestError::SendResponse)
|
||||
pending_response.send(OutgoingResponse {
|
||||
result: Ok(data),
|
||||
reputation_changes: Vec::new(),
|
||||
}).map_err(|_| HandleRequestError::SendResponse)
|
||||
}
|
||||
|
||||
/// Run [`BlockRequestHandler`].
|
||||
|
||||
@@ -23,7 +23,11 @@
|
||||
|
||||
pub use crate::chain::Client;
|
||||
pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand};
|
||||
pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig};
|
||||
pub use crate::request_responses::{
|
||||
IncomingRequest,
|
||||
OutgoingResponse,
|
||||
ProtocolConfig as RequestResponseConfig,
|
||||
};
|
||||
pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr};
|
||||
|
||||
// Note: this re-export shouldn't be part of the public API of the crate and will be removed in
|
||||
|
||||
@@ -55,6 +55,7 @@ use std::{
|
||||
pin::Pin, task::{Context, Poll}, time::Duration,
|
||||
};
|
||||
use wasm_timer::Instant;
|
||||
use crate::ReputationChange;
|
||||
|
||||
pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId};
|
||||
|
||||
@@ -114,8 +115,27 @@ pub struct IncomingRequest {
|
||||
/// [`ProtocolConfig::max_request_size`].
|
||||
pub payload: Vec<u8>,
|
||||
|
||||
/// Channel to send back the response to.
|
||||
pub pending_response: oneshot::Sender<Vec<u8>>,
|
||||
/// Channel to send back the response.
|
||||
///
|
||||
/// There are two ways to indicate that handling the request failed:
|
||||
///
|
||||
/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
|
||||
///
|
||||
/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
|
||||
/// the given peer.
|
||||
pub pending_response: oneshot::Sender<OutgoingResponse>,
|
||||
}
|
||||
|
||||
/// Response for an incoming request to be send by a request protocol handler.
|
||||
#[derive(Debug)]
|
||||
pub struct OutgoingResponse {
|
||||
/// The payload of the response.
|
||||
///
|
||||
/// `Err(())` if none is available e.g. due an error while handling the request.
|
||||
pub result: Result<Vec<u8>, ()>,
|
||||
/// Reputation changes accrued while handling the request. To be applied to the reputation of
|
||||
/// the peer sending the request.
|
||||
pub reputation_changes: Vec<ReputationChange>,
|
||||
}
|
||||
|
||||
/// Event generated by the [`RequestResponsesBehaviour`].
|
||||
@@ -150,6 +170,12 @@ pub enum Event {
|
||||
/// Result of the request.
|
||||
result: Result<(), RequestFailure>
|
||||
},
|
||||
|
||||
/// A request protocol handler issued reputation changes for the given peer.
|
||||
ReputationChanges {
|
||||
peer: PeerId,
|
||||
changes: Vec<ReputationChange>,
|
||||
}
|
||||
}
|
||||
|
||||
/// Combination of a protocol name and a request id.
|
||||
@@ -198,10 +224,11 @@ pub struct RequestResponsesBehaviour {
|
||||
|
||||
/// Generated by the response builder and waiting to be processed.
|
||||
struct RequestProcessingOutcome {
|
||||
peer: PeerId,
|
||||
request_id: RequestId,
|
||||
protocol: Cow<'static, str>,
|
||||
inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
|
||||
response: Vec<u8>,
|
||||
response: OutgoingResponse,
|
||||
}
|
||||
|
||||
impl RequestResponsesBehaviour {
|
||||
@@ -406,30 +433,45 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
|
||||
// Poll to see if any response is ready to be sent back.
|
||||
while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
|
||||
let RequestProcessingOutcome {
|
||||
peer,
|
||||
request_id,
|
||||
protocol: protocol_name,
|
||||
inner_channel,
|
||||
response
|
||||
response: OutgoingResponse {
|
||||
result,
|
||||
reputation_changes,
|
||||
},
|
||||
} = match outcome {
|
||||
Some(outcome) => outcome,
|
||||
// The response builder was too busy and thus the request was dropped. This is
|
||||
// The response builder was too busy or handling the request failed. This is
|
||||
// later on reported as a `InboundFailure::Omission`.
|
||||
None => continue,
|
||||
};
|
||||
|
||||
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
|
||||
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
|
||||
// Note: Failure is handled further below when receiving `InboundFailure`
|
||||
// event from `RequestResponse` behaviour.
|
||||
log::debug!(
|
||||
target: "sub-libp2p",
|
||||
"Failed to send response for {:?} on protocol {:?} due to a \
|
||||
timeout or due to the connection to the peer being closed. \
|
||||
Dropping response",
|
||||
request_id, protocol_name,
|
||||
);
|
||||
if let Ok(payload) = result {
|
||||
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
|
||||
if let Err(_) = protocol.send_response(inner_channel, Ok(payload)) {
|
||||
// Note: Failure is handled further below when receiving
|
||||
// `InboundFailure` event from `RequestResponse` behaviour.
|
||||
log::debug!(
|
||||
target: "sub-libp2p",
|
||||
"Failed to send response for {:?} on protocol {:?} due to a \
|
||||
timeout or due to the connection to the peer being closed. \
|
||||
Dropping response",
|
||||
request_id, protocol_name,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !reputation_changes.is_empty() {
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
|
||||
Event::ReputationChanges{
|
||||
peer,
|
||||
changes: reputation_changes,
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// Poll request-responses protocols.
|
||||
@@ -505,7 +547,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
|
||||
// `InboundFailure::Omission` event.
|
||||
if let Ok(response) = rx.await {
|
||||
Some(RequestProcessingOutcome {
|
||||
request_id, protocol, inner_channel: channel, response
|
||||
peer, request_id, protocol, inner_channel: channel, response
|
||||
})
|
||||
} else {
|
||||
None
|
||||
@@ -851,7 +893,10 @@ mod tests {
|
||||
pool.spawner().spawn_obj(async move {
|
||||
while let Some(rq) = rx.next().await {
|
||||
assert_eq!(rq.payload, b"this is a request");
|
||||
let _ = rq.pending_response.send(b"this is a response".to_vec());
|
||||
let _ = rq.pending_response.send(super::OutgoingResponse {
|
||||
result: Ok(b"this is a response".to_vec()),
|
||||
reputation_changes: Vec::new(),
|
||||
});
|
||||
}
|
||||
}.boxed().into()).unwrap();
|
||||
|
||||
@@ -934,7 +979,10 @@ mod tests {
|
||||
pool.spawner().spawn_obj(async move {
|
||||
while let Some(rq) = rx.next().await {
|
||||
assert_eq!(rq.payload, b"this is a request");
|
||||
let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec());
|
||||
let _ = rq.pending_response.send(super::OutgoingResponse {
|
||||
result: Ok(b"this response exceeds the limit".to_vec()),
|
||||
reputation_changes: Vec::new(),
|
||||
});
|
||||
}
|
||||
}.boxed().into()).unwrap();
|
||||
|
||||
@@ -1100,11 +1148,17 @@ mod tests {
|
||||
|
||||
protocol_1_request.unwrap()
|
||||
.pending_response
|
||||
.send(b"this is a response".to_vec())
|
||||
.send(OutgoingResponse {
|
||||
result: Ok(b"this is a response".to_vec()),
|
||||
reputation_changes: Vec::new(),
|
||||
})
|
||||
.unwrap();
|
||||
protocol_2_request.unwrap()
|
||||
.pending_response
|
||||
.send(b"this is a response".to_vec())
|
||||
.send(OutgoingResponse {
|
||||
result: Ok(b"this is a response".to_vec()),
|
||||
reputation_changes: Vec::new(),
|
||||
})
|
||||
.unwrap();
|
||||
}.boxed().into()).unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user