*: Update to libp2p v0.32.0 (#7696)

* *: Update to libp2p v0.32.0

* Cargo.lock: Update async-tls to 0.10.2

* client/network/request_response: Adjust to new request response events

* client/network/request_response.rs: Clean up silently failing responses

* client/network/discovery: Lazily instantiate mdns

* client/network/discovery: Exclude MdnsWrapper for target_os unknown

* client/network/discovery: Fix indentation

* client/network/request-response: Use LruCache to track pending resp time

* client/network/request_responses: Fix early connection closed error

* client/network/request-response: Replace debug_assert with debug

* client/network/request-response: Fix typo

* client/network/request-response: Don't emit event on send_response fail

* client/network/request-response: Revert waker.wake_by_ref()

* client/network/request-resp: Make duration in InboundRequest optional

* client/network/req-resp: Don't emit two events for busy builder

When a response builder is busy incoming requests are dropped.
Previously this was reported both via a `ResponseFailure::Busy` and a
`ReponseFailure::Network(InboundFailure::Omisssion)` event.

With this commit the former is removed, leaving only the latter in
place.
This commit is contained in:
Max Inden
2020-12-09 22:58:22 +01:00
committed by GitHub
parent b29d6092c2
commit 0a2636d20c
14 changed files with 225 additions and 131 deletions
+105 -66
View File
@@ -48,9 +48,10 @@ use libp2p::{
PollParameters, ProtocolsHandler,
},
};
use lru::LruCache;
use std::{
borrow::Cow, collections::{hash_map::Entry, HashMap}, convert::TryFrom as _, io, iter,
pin::Pin, task::{Context, Poll}, time::Duration,
pin::Pin, task::{Context, Poll}, time::{Duration, Instant},
};
pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId};
@@ -128,7 +129,10 @@ pub enum Event {
protocol: Cow<'static, str>,
/// If `Ok`, contains the time elapsed between when we received the request and when we
/// sent back the response. If `Err`, the error that happened.
result: Result<Duration, ResponseFailure>,
///
/// Note: Given that response time is tracked on a best-effort basis only, `Ok(time)` can be
/// `None`.
result: Result<Option<Duration>, ResponseFailure>,
},
/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
@@ -154,21 +158,19 @@ pub struct RequestResponsesBehaviour {
/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
/// response to send back to the remote.
pending_responses: stream::FuturesUnordered<
Pin<Box<dyn Future<Output = RequestProcessingOutcome> + Send>>
Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>
>,
/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
pending_responses_arrival_time: LruCache<RequestId, Instant>,
}
/// Generated by the response builder and waiting to be processed.
enum RequestProcessingOutcome {
Response {
protocol: Cow<'static, str>,
inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
response: Vec<u8>,
},
Busy {
peer: PeerId,
protocol: Cow<'static, str>,
},
struct RequestProcessingOutcome {
request_id: RequestId,
protocol: Cow<'static, str>,
inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
response: Vec<u8>,
}
impl RequestResponsesBehaviour {
@@ -201,7 +203,8 @@ impl RequestResponsesBehaviour {
Ok(Self {
protocols,
pending_responses: stream::FuturesUnordered::new(),
pending_responses: Default::default(),
pending_responses_arrival_time: LruCache::new(1_000),
})
}
@@ -347,22 +350,31 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
> {
'poll_all: loop {
// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) {
match result {
RequestProcessingOutcome::Response {
protocol, inner_channel, response
} => {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) {
protocol.send_response(inner_channel, Ok(response));
}
}
RequestProcessingOutcome::Busy { peer, protocol } => {
let out = Event::InboundRequest {
peer,
protocol,
result: Err(ResponseFailure::Busy),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
let RequestProcessingOutcome {
request_id,
protocol: protocol_name,
inner_channel,
response
} = match outcome {
Some(outcome) => outcome,
// The response builder was too busy and thus the request was dropped. This is
// later on reported as a `InboundFailure::Omission`.
None => continue,
};
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
// Note: In case this happened due to a timeout, the corresponding
// `RequestResponse` behaviour will emit an `InboundFailure::Timeout` event.
self.pending_responses_arrival_time.pop(&request_id);
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
timeout or due to the connection to the peer being closed. \
Dropping response",
request_id, protocol_name,
);
}
}
}
@@ -409,15 +421,21 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Received a request from a remote.
RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Request { request, channel, .. },
message: RequestResponseMessage::Request { request_id, request, channel, .. },
} => {
self.pending_responses_arrival_time.put(
request_id.clone(),
Instant::now(),
);
let (tx, rx) = oneshot::channel();
// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(resp_builder) = resp_builder {
// If the response builder is too busy, silently drop `tx`.
// This will be reported as a `Busy` error.
// If the response builder is too busy, silently drop `tx`. This
// will be reported by the corresponding `RequestResponse` through
// an `InboundFailure::Omission` event.
let _ = resp_builder.try_send(IncomingRequest {
peer: peer.clone(),
payload: request,
@@ -428,13 +446,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
let protocol = protocol.clone();
self.pending_responses.push(Box::pin(async move {
// The `tx` created above can be dropped if we are not capable of
// processing this request, which is reflected as a "Busy" error.
// processing this request, which is reflected as a
// `InboundFailure::Omission` event.
if let Ok(response) = rx.await {
RequestProcessingOutcome::Response {
protocol, inner_channel: channel, response
}
Some(RequestProcessingOutcome {
request_id, protocol, inner_channel: channel, response
})
} else {
RequestProcessingOutcome::Busy { peer, protocol }
None
}
}));
@@ -445,11 +464,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Received a response from a remote to one of our requests.
RequestResponseEvent::Message {
message:
RequestResponseMessage::Response {
request_id,
response,
},
message: RequestResponseMessage::Response {
request_id,
response,
},
..
} => {
let out = Event::RequestFinished {
@@ -472,8 +490,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
// Remote has tried to send a request but failed.
RequestResponseEvent::InboundFailure { peer, error, .. } => {
// An inbound request failed, either while reading the request or due to failing
// to send a response.
RequestResponseEvent::InboundFailure { request_id, peer, error, .. } => {
self.pending_responses_arrival_time.pop(&request_id);
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
@@ -481,6 +501,24 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
RequestResponseEvent::ResponseSent { request_id, peer } => {
let arrival_time = self.pending_responses_arrival_time.pop(&request_id)
.map(|t| t.elapsed());
if arrival_time.is_none() {
log::debug!(
"Expected to find arrival time for sent response. Is the LRU \
cache size set too small?",
);
}
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
result: Ok(arrival_time),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
};
}
}
@@ -520,8 +558,6 @@ pub enum RequestFailure {
/// Error when processing a request sent by a remote.
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum ResponseFailure {
/// Internal response builder is too busy to process this request.
Busy,
/// Problem on the network.
#[display(fmt = "Problem on the network")]
Network(#[error(ignore)] InboundFailure),
@@ -655,7 +691,10 @@ impl RequestResponseCodec for GenericCodec {
#[cfg(test)]
mod tests {
use futures::{channel::mpsc, prelude::*};
use futures::channel::mpsc;
use futures::executor::LocalPool;
use futures::prelude::*;
use futures::task::Spawn;
use libp2p::identity::Keypair;
use libp2p::Multiaddr;
use libp2p::core::upgrade;
@@ -666,7 +705,8 @@ mod tests {
#[test]
fn basic_request_response_works() {
let protocol_name = "/test/req-rep/1";
let protocol_name = "/test/req-resp/1";
let mut pool = LocalPool::new();
// Build swarms whose behaviour is `RequestResponsesBehaviour`.
let mut swarms = (0..2)
@@ -694,12 +734,12 @@ mod tests {
inbound_queue: Some(tx),
})).unwrap();
async_std::task::spawn(async move {
pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(b"this is a response".to_vec());
}
});
}.boxed().into()).unwrap();
b
};
@@ -719,26 +759,24 @@ mod tests {
Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap();
}
// Running `swarm[0]` in the background until a `InboundRequest` event happens,
// which is a hint about the test having ended.
async_std::task::spawn({
// Running `swarm[0]` in the background.
pool.spawner().spawn_obj({
let (mut swarm, _) = swarms.remove(0);
async move {
loop {
match swarm.next_event().await {
SwarmEvent::Behaviour(super::Event::InboundRequest { result, .. }) => {
assert!(result.is_ok());
break
result.unwrap();
},
_ => {}
}
}
}
});
}.boxed().into()
}).unwrap();
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
async_std::task::block_on(async move {
pool.run_until(async move {
let mut sent_request_id = None;
loop {
@@ -769,7 +807,8 @@ mod tests {
#[test]
fn max_response_size_exceeded() {
let protocol_name = "/test/req-rep/1";
let protocol_name = "/test/req-resp/1";
let mut pool = LocalPool::new();
// Build swarms whose behaviour is `RequestResponsesBehaviour`.
let mut swarms = (0..2)
@@ -797,12 +836,12 @@ mod tests {
inbound_queue: Some(tx),
})).unwrap();
async_std::task::spawn(async move {
pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec());
}
});
}.boxed().into()).unwrap();
b
};
@@ -824,7 +863,7 @@ mod tests {
// Running `swarm[0]` in the background until a `InboundRequest` event happens,
// which is a hint about the test having ended.
async_std::task::spawn({
pool.spawner().spawn_obj({
let (mut swarm, _) = swarms.remove(0);
async move {
loop {
@@ -836,12 +875,12 @@ mod tests {
_ => {}
}
}
}
});
}.boxed().into()
}).unwrap();
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
async_std::task::block_on(async move {
pool.run_until(async move {
let mut sent_request_id = None;
loop {