Add a feedback when response is successfully sent (#8510)

* Add a feedback when response is successfully sent

* Fix gp warp sync
This commit is contained in:
Pierre Krieger
2021-04-01 18:25:37 +02:00
committed by GitHub
parent 1fdc8fa1c1
commit 09aa03e3c5
4 changed files with 52 additions and 2 deletions
@@ -135,6 +135,7 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
pending_response.send(OutgoingResponse {
result: Ok(proof.encode()),
reputation_changes: Vec::new(),
sent_feedback: None,
}).map_err(|_| HandleRequestError::SendResponse)
}
@@ -254,6 +254,7 @@ impl<B: BlockT> BlockRequestHandler<B> {
pending_response.send(OutgoingResponse {
result,
reputation_changes,
sent_feedback: None,
}).map_err(|_| HandleRequestError::SendResponse)
}
@@ -82,7 +82,12 @@ impl<B: Block> LightClientRequestHandler<B> {
match self.handle_request(peer, payload) {
Ok(response_data) => {
let response = OutgoingResponse { result: Ok(response_data), reputation_changes: Vec::new() };
let response = OutgoingResponse {
result: Ok(response_data),
reputation_changes: Vec::new(),
sent_feedback: None
};
match pending_response.send(response) {
Ok(()) => debug!(
target: LOG_TARGET,
@@ -110,7 +115,12 @@ impl<B: Block> LightClientRequestHandler<B> {
_ => Vec::new(),
};
let response = OutgoingResponse { result: Err(()), reputation_changes };
let response = OutgoingResponse {
result: Err(()),
reputation_changes,
sent_feedback: None
};
if pending_response.send(response).is_err() {
debug!(
target: LOG_TARGET,
@@ -133,9 +133,20 @@ pub struct OutgoingResponse {
///
/// `Err(())` if none is available e.g. due an error while handling the request.
pub result: Result<Vec<u8>, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<ReputationChange>,
/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
///
/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
/// > outgoing data for each TCP socket, and it is not possible for a user
/// > application to inspect this buffer. This channel here is not actually notified
/// > when the response has been fully sent out, but rather when it has fully been
/// > written to the buffer managed by the operating system.
pub sent_feedback: Option<oneshot::Sender<()>>,
}
/// Event generated by the [`RequestResponsesBehaviour`].
@@ -240,6 +251,10 @@ pub struct RequestResponsesBehaviour {
/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
pending_responses_arrival_time: HashMap<ProtocolRequestId, Instant>,
/// Whenever a response is received on `pending_responses`, insert a channel to be notified
/// when the request has been sent out.
send_feedback: HashMap<ProtocolRequestId, oneshot::Sender<()>>,
}
/// Generated by the response builder and waiting to be processed.
@@ -284,6 +299,7 @@ impl RequestResponsesBehaviour {
pending_requests: Default::default(),
pending_responses: Default::default(),
pending_responses_arrival_time: Default::default(),
send_feedback: Default::default(),
})
}
@@ -463,6 +479,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
response: OutgoingResponse {
result,
reputation_changes,
sent_feedback,
},
} = match outcome {
Some(outcome) => outcome,
@@ -483,6 +500,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
Dropping response",
request_id, protocol_name,
);
} else {
if let Some(sent_feedback) = sent_feedback {
self.send_feedback.insert(
(protocol_name, request_id).into(),
sent_feedback
);
}
}
}
}
@@ -668,6 +692,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
self.pending_responses_arrival_time.remove(
&(protocol.clone(), request_id).into(),
);
self.send_feedback.remove(&(protocol.clone(), request_id).into());
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
@@ -690,11 +715,18 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
failed; qed.",
);
if let Some(send_feedback) = self.send_feedback.remove(
&(protocol.clone(), request_id).into()
) {
let _ = send_feedback.send(());
}
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
result: Ok(arrival_time),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
@@ -914,11 +946,14 @@ mod tests {
pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
let (fb_tx, fb_rx) = oneshot::channel();
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: Some(fb_tx),
});
fb_rx.await.unwrap();
}
}.boxed().into()).unwrap();
@@ -1005,6 +1040,7 @@ mod tests {
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this response exceeds the limit".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: None,
});
}
}.boxed().into()).unwrap();
@@ -1175,6 +1211,7 @@ mod tests {
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: None,
})
.unwrap();
protocol_2_request.unwrap()
@@ -1182,6 +1219,7 @@ mod tests {
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: None,
})
.unwrap();
}.boxed().into()).unwrap();