Add a send_request function to NetworkService (#8008)

* Add a `send_request` to `NetworkService`.

This function delivers responses via a provided sender and also allows
for sending requests to currently not connected peers.

* Document caveats of send_request better.

* Fix compilation in certain cases.

* Update docs + introduce IfDisconnected enum

for more readable function calls.

* Doc fix.

* Rename send_request to detached_request.

* Whitespace fix - arrrgh

* Update client/network/src/service.rs

spaces/tabs

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>

* Update client/network/src/request_responses.rs

Documentation fix

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Update client/network/src/service.rs

Typo.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Update client/network/src/service.rs

Better docs.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Update client/network/src/service.rs

Typo.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Update client/network/src/service.rs

Doc improvements.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>

* Remove error in logs on dialing a peer.

This is now valid behaviour.

* Rename detached_request to start_request.

As suggested by @romanb.

* Fix merged master.

* Fix too long lines.

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
Robert Klotzner
2021-02-02 20:52:12 +01:00
committed by GitHub
parent 73386a4215
commit 3628998d3c
3 changed files with 85 additions and 23 deletions
+19 -6
View File
@@ -45,6 +45,7 @@ use std::{
pub use crate::request_responses::{
ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId,
IfDisconnected
};
/// General behaviour of the network. Combines all protocols together.
@@ -248,8 +249,9 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
protocol: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
self.request_responses.send_request(target, protocol, request, pending_response)
self.request_responses.send_request(target, protocol, request, pending_response, connect)
}
/// Returns a shared reference to the user protocol.
@@ -317,7 +319,7 @@ Behaviour<B, H> {
}
self.request_responses.send_request(
&target, &self.block_request_protocol_name, buf, pending_response,
&target, &self.block_request_protocol_name, buf, pending_response, IfDisconnected::ImmediateError,
);
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocol, roles, notifications_sink } => {
@@ -454,11 +456,22 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
use light_client_requests::sender::OutEvent;
while let Poll::Ready(Some(event)) = self.light_client_request_sender.poll_next_unpin(cx) {
while let Poll::Ready(Some(event)) =
self.light_client_request_sender.poll_next_unpin(cx)
{
match event {
OutEvent::SendRequest { target, request, pending_response, protocol_name } => {
self.request_responses.send_request(&target, &protocol_name, request, pending_response)
}
OutEvent::SendRequest {
target,
request,
pending_response,
protocol_name,
} => self.request_responses.send_request(
&target,
&protocol_name,
request,
pending_response,
IfDisconnected::ImmediateError,
),
}
}
@@ -196,6 +196,25 @@ impl From<(Cow<'static, str>, RequestId)> for ProtocolRequestId {
}
}
/// When sending a request, what to do on a disconnected recipient.
pub enum IfDisconnected {
/// Try to connect to the peer.
TryConnect,
/// Just fail if the destination is not yet connected.
ImmediateError,
}
/// Convenience functions for `IfDisconnected`.
impl IfDisconnected {
/// Shall we connect to a disconnected peer?
pub fn should_connect(self) -> bool {
match self {
Self::TryConnect => true,
Self::ImmediateError => false,
}
}
}
/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
pub struct RequestResponsesBehaviour {
/// The multiple sub-protocols, by name.
@@ -269,17 +288,19 @@ impl RequestResponsesBehaviour {
/// Initiates sending a request.
///
/// An error is returned if we are not connected to the target peer or if the protocol doesn't
/// match one that has been registered.
/// If there is no established connection to the target peer, the behavior is determined by the choice of `connect`.
///
/// An error is returned if the protocol doesn't match one that has been registered.
pub fn send_request(
&mut self,
target: &PeerId,
protocol_name: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) {
if protocol.is_connected(target) {
if protocol.is_connected(target) || connect.should_connect() {
let request_id = protocol.send_request(target, request);
let prev_req_id = self.pending_requests.insert(
(protocol_name.to_string().into(), request_id).into(),
@@ -489,7 +510,6 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
return Poll::Ready(NetworkBehaviourAction::DialAddress { address })
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
log::error!("The request-response isn't supposed to start dialing peers");
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id,
condition,
@@ -949,6 +969,7 @@ mod tests {
protocol_name,
b"this is a request".to_vec(),
sender,
IfDisconnected::ImmediateError,
);
assert!(response_receiver.is_none());
response_receiver = Some(receiver);
@@ -1037,6 +1058,7 @@ mod tests {
protocol_name,
b"this is a request".to_vec(),
sender,
IfDisconnected::ImmediateError,
);
assert!(response_receiver.is_none());
response_receiver = Some(receiver);
@@ -1179,12 +1201,14 @@ mod tests {
protocol_name_1,
b"this is a request".to_vec(),
sender_1,
IfDisconnected::ImmediateError,
);
swarm_1.send_request(
&peer_id,
protocol_name_2,
b"this is a request".to_vec(),
sender_2,
IfDisconnected::ImmediateError,
);
assert!(response_receivers.is_none());
response_receivers = Some((receiver_1, receiver_2));
+38 -13
View File
@@ -98,7 +98,7 @@ use std::{
task::Poll,
};
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, IfDisconnected};
mod metrics;
mod out_events;
@@ -812,9 +812,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// notifications should remain the default ways of communicating information. For example, a
/// peer can announce something through a notification, after which the recipient can obtain
/// more information by performing a request.
/// As such, this function is meant to be called only with peers we are already connected to.
/// Calling this method with a `target` we are not connected to will *not* attempt to connect
/// to said peer.
/// As such, call this function with `IfDisconnected::ImmediateError` for `connect`. This way you
/// will get an error immediately for disconnected peers, instead of waiting for a potentially very
/// long connection attempt, which would suggest that something is wrong anyway, as you are
/// supposed to be connected because of the notification protocol.
///
/// No limit or throttling of concurrent outbound requests per peer and protocol are enforced.
/// Such restrictions, if desired, need to be enforced at the call site(s).
@@ -826,15 +827,12 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
&self,
target: PeerId,
protocol: impl Into<Cow<'static, str>>,
request: Vec<u8>
request: Vec<u8>,
connect: IfDisconnected,
) -> Result<Vec<u8>, RequestFailure> {
let (tx, rx) = oneshot::channel();
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
pending_response: tx
});
self.start_request(target, protocol, request, tx, connect);
match rx.await {
Ok(v) => v,
@@ -845,6 +843,32 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
}
}
/// Variation of `request` which starts a request whose response is delivered on a provided channel.
///
/// Instead of blocking and waiting for a reply, this function returns immediately, sending
/// responses via the passed in sender. This alternative API exists to make it easier to
/// integrate with message passing APIs.
///
/// Keep in mind that the connected receiver might receive a `Canceled` event in case of a
/// closing connection. This is expected behaviour. With `request` you would get a
/// `RequestFailure::Network(OutboundFailure::ConnectionClosed)` in that case.
pub fn start_request(
&self,
target: PeerId,
protocol: impl Into<Cow<'static, str>>,
request: Vec<u8>,
tx: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request {
target,
protocol: protocol.into(),
request,
pending_response: tx,
connect,
});
}
/// You may call this when new transactons are imported by the transaction pool.
///
/// All transactions will be fetched from the `TransactionPool` that was passed at
@@ -1262,6 +1286,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
protocol: Cow<'static, str>,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
},
DisconnectPeer(PeerId, Cow<'static, str>),
NewBestBlockImported(B::Hash, NumberFor<B>),
@@ -1385,8 +1410,8 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => {
this.network_service.send_request(&target, &protocol, request, pending_response);
ServiceToWorkerMsg::Request { target, protocol, request, pending_response, connect } => {
this.network_service.send_request(&target, &protocol, request, pending_response, connect);
},
ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) =>
this.network_service.user_protocol_mut().disconnect_peer(&who, &protocol_name),