diff --git a/substrate/client/finality-grandpa-warp-sync/src/lib.rs b/substrate/client/finality-grandpa-warp-sync/src/lib.rs index 52e18e3890..a43aaf0305 100644 --- a/substrate/client/finality-grandpa-warp-sync/src/lib.rs +++ b/substrate/client/finality-grandpa-warp-sync/src/lib.rs @@ -135,6 +135,7 @@ impl> GrandpaWarpSyncRequestHandler BlockRequestHandler { pending_response.send(OutgoingResponse { result, reputation_changes, + sent_feedback: None, }).map_err(|_| HandleRequestError::SendResponse) } diff --git a/substrate/client/network/src/light_client_requests/handler.rs b/substrate/client/network/src/light_client_requests/handler.rs index fe0a3cb187..cf2ef70686 100644 --- a/substrate/client/network/src/light_client_requests/handler.rs +++ b/substrate/client/network/src/light_client_requests/handler.rs @@ -82,7 +82,12 @@ impl LightClientRequestHandler { match self.handle_request(peer, payload) { Ok(response_data) => { - let response = OutgoingResponse { result: Ok(response_data), reputation_changes: Vec::new() }; + let response = OutgoingResponse { + result: Ok(response_data), + reputation_changes: Vec::new(), + sent_feedback: None + }; + match pending_response.send(response) { Ok(()) => debug!( target: LOG_TARGET, @@ -110,7 +115,12 @@ impl LightClientRequestHandler { _ => Vec::new(), }; - let response = OutgoingResponse { result: Err(()), reputation_changes }; + let response = OutgoingResponse { + result: Err(()), + reputation_changes, + sent_feedback: None + }; + if pending_response.send(response).is_err() { debug!( target: LOG_TARGET, diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index e8ca2795ea..1b23ee3699 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -133,9 +133,20 @@ pub struct OutgoingResponse { /// /// `Err(())` if none is available e.g. due an error while handling the request. pub result: Result, ()>, + /// Reputation changes accrued while handling the request. To be applied to the reputation of /// the peer sending the request. pub reputation_changes: Vec, + + /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the + /// peer. + /// + /// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of + /// > outgoing data for each TCP socket, and it is not possible for a user + /// > application to inspect this buffer. This channel here is not actually notified + /// > when the response has been fully sent out, but rather when it has fully been + /// > written to the buffer managed by the operating system. + pub sent_feedback: Option>, } /// Event generated by the [`RequestResponsesBehaviour`]. @@ -240,6 +251,10 @@ pub struct RequestResponsesBehaviour { /// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here. pending_responses_arrival_time: HashMap, + + /// Whenever a response is received on `pending_responses`, insert a channel to be notified + /// when the request has been sent out. + send_feedback: HashMap>, } /// Generated by the response builder and waiting to be processed. @@ -284,6 +299,7 @@ impl RequestResponsesBehaviour { pending_requests: Default::default(), pending_responses: Default::default(), pending_responses_arrival_time: Default::default(), + send_feedback: Default::default(), }) } @@ -463,6 +479,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { response: OutgoingResponse { result, reputation_changes, + sent_feedback, }, } = match outcome { Some(outcome) => outcome, @@ -483,6 +500,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour { Dropping response", request_id, protocol_name, ); + } else { + if let Some(sent_feedback) = sent_feedback { + self.send_feedback.insert( + (protocol_name, request_id).into(), + sent_feedback + ); + } } } } @@ -668,6 +692,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { self.pending_responses_arrival_time.remove( &(protocol.clone(), request_id).into(), ); + self.send_feedback.remove(&(protocol.clone(), request_id).into()); let out = Event::InboundRequest { peer, protocol: protocol.clone(), @@ -690,11 +715,18 @@ impl NetworkBehaviour for RequestResponsesBehaviour { failed; qed.", ); + if let Some(send_feedback) = self.send_feedback.remove( + &(protocol.clone(), request_id).into() + ) { + let _ = send_feedback.send(()); + } + let out = Event::InboundRequest { peer, protocol: protocol.clone(), result: Ok(arrival_time), }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } @@ -914,11 +946,14 @@ mod tests { pool.spawner().spawn_obj(async move { while let Some(rq) = rx.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); assert_eq!(rq.payload, b"this is a request"); let _ = rq.pending_response.send(super::OutgoingResponse { result: Ok(b"this is a response".to_vec()), reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), }); + fb_rx.await.unwrap(); } }.boxed().into()).unwrap(); @@ -1005,6 +1040,7 @@ mod tests { let _ = rq.pending_response.send(super::OutgoingResponse { result: Ok(b"this response exceeds the limit".to_vec()), reputation_changes: Vec::new(), + sent_feedback: None, }); } }.boxed().into()).unwrap(); @@ -1175,6 +1211,7 @@ mod tests { .send(OutgoingResponse { result: Ok(b"this is a response".to_vec()), reputation_changes: Vec::new(), + sent_feedback: None, }) .unwrap(); protocol_2_request.unwrap() @@ -1182,6 +1219,7 @@ mod tests { .send(OutgoingResponse { result: Ok(b"this is a response".to_vec()), reputation_changes: Vec::new(), + sent_feedback: None, }) .unwrap(); }.boxed().into()).unwrap();