*: Update to libp2p v0.33.0 (#7759)

* *: Update to libp2p v0.33.0

* client/network: Consistently track request arrival time

With https://github.com/libp2p/rust-libp2p/pull/1886/ one is guaranteed
to receive either a `ResponseSent` or a `InboundFailure` event for each
received inbound request via `RequestResponseEvent::Message`. Given this
guarantee there is no need to track arrival times in a best-effort
manner and thus there is no need to use a LRU cache for arrival times.

* client/offchain: Adjust to PeerId API changes
This commit is contained in:
Max Inden
2021-01-04 12:54:51 +01:00
committed by GitHub
parent 51c37ecc15
commit f0b99dd2f2
15 changed files with 88 additions and 91 deletions
+2 -3
View File
@@ -36,7 +36,6 @@ ip_network = "0.3.4"
linked-hash-map = "0.5.2"
linked_hash_set = "0.1.3"
log = "0.4.8"
lru = "0.6.1"
nohash-hasher = "0.2.0"
parking_lot = "0.11.1"
pin-project = "0.4.6"
@@ -64,13 +63,13 @@ wasm-timer = "0.2"
zeroize = "1.2.0"
[dependencies.libp2p]
version = "0.32.2"
version = "0.33.0"
default-features = false
features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"]
[dev-dependencies]
assert_matches = "1.3"
libp2p = { version = "0.32.2", default-features = false }
libp2p = { version = "0.33.0", default-features = false }
quickcheck = "0.9.0"
rand = "0.7.2"
sp-keyring = { version = "2.0.0", path = "../../primitives/keyring" }
+2 -2
View File
@@ -90,7 +90,7 @@ pub enum BehaviourOut<B: BlockT> {
protocol: Cow<'static, str>,
/// If `Ok`, contains the time elapsed between when we received the request and when we
/// sent back the response. If `Err`, the error that happened.
result: Result<Option<Duration>, ResponseFailure>,
result: Result<Duration, ResponseFailure>,
},
/// A request initiated using [`Behaviour::send_request`] has succeeded or failed.
@@ -419,7 +419,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
self.events.push_back(BehaviourOut::InboundRequest {
peer,
protocol: self.block_requests.protocol_name().to_owned().into(),
result: Ok(Some(total_handling_time)),
result: Ok(total_handling_time),
});
},
block_requests::Event::Response { peer, response, request_duration } => {
@@ -50,7 +50,6 @@ use libp2p::{
PollParameters, ProtocolsHandler,
},
};
use lru::LruCache;
use std::{
borrow::Cow, collections::{hash_map::Entry, HashMap}, convert::TryFrom as _, io, iter,
pin::Pin, task::{Context, Poll}, time::{Duration, Instant},
@@ -129,12 +128,11 @@ pub enum Event {
peer: PeerId,
/// Name of the protocol in question.
protocol: Cow<'static, str>,
/// If `Ok`, contains the time elapsed between when we received the request and when we
/// sent back the response. If `Err`, the error that happened.
/// Whether handling the request was successful or unsuccessful.
///
/// Note: Given that response time is tracked on a best-effort basis only, `Ok(time)` can be
/// `None`.
result: Result<Option<Duration>, ResponseFailure>,
/// When successful contains the time elapsed between when we received the request and when
/// we sent back the response. When unsuccessful contains the failure reason.
result: Result<Duration, ResponseFailure>,
},
/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
@@ -164,7 +162,7 @@ pub struct RequestResponsesBehaviour {
>,
/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
pending_responses_arrival_time: LruCache<RequestId, Instant>,
pending_responses_arrival_time: HashMap<RequestId, Instant>,
}
/// Generated by the response builder and waiting to be processed.
@@ -206,7 +204,7 @@ impl RequestResponsesBehaviour {
Ok(Self {
protocols,
pending_responses: Default::default(),
pending_responses_arrival_time: LruCache::new(1_000),
pending_responses_arrival_time: Default::default(),
})
}
@@ -367,9 +365,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
// Note: In case this happened due to a timeout, the corresponding
// `RequestResponse` behaviour will emit an `InboundFailure::Timeout` event.
self.pending_responses_arrival_time.pop(&request_id);
// Note: Failure is handled further below when receiving `InboundFailure`
// event from `RequestResponse` behaviour.
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
@@ -425,7 +422,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
peer,
message: RequestResponseMessage::Request { request_id, request, channel, .. },
} => {
self.pending_responses_arrival_time.put(
self.pending_responses_arrival_time.insert(
request_id.clone(),
Instant::now(),
);
@@ -495,7 +492,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// An inbound request failed, either while reading the request or due to failing
// to send a response.
RequestResponseEvent::InboundFailure { request_id, peer, error, .. } => {
self.pending_responses_arrival_time.pop(&request_id);
self.pending_responses_arrival_time.remove(&request_id);
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
@@ -504,14 +501,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
RequestResponseEvent::ResponseSent { request_id, peer } => {
let arrival_time = self.pending_responses_arrival_time.pop(&request_id)
.map(|t| t.elapsed());
if arrival_time.is_none() {
log::debug!(
"Expected to find arrival time for sent response. Is the LRU \
cache size set too small?",
);
}
let arrival_time = self.pending_responses_arrival_time.remove(&request_id)
.map(|t| t.elapsed())
.expect("To find request arrival time for answered request.");
let out = Event::InboundRequest {
peer,
+3 -4
View File
@@ -1373,14 +1373,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. })) => {
if let Some(metrics) = this.metrics.as_ref() {
match result {
Ok(Some(serve_time)) => {
Ok(serve_time) => {
metrics.requests_in_success_total
.with_label_values(&[&protocol])
.observe(serve_time.as_secs_f64());
}
// Response time tracking is happening on a best-effort basis. Ignore
// the event in case response time could not be provided.
Ok(None) => {},
Err(err) => {
let reason = match err {
ResponseFailure::Network(InboundFailure::Timeout) => "timeout",
@@ -1388,6 +1385,8 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
"unsupported",
ResponseFailure::Network(InboundFailure::ResponseOmission) =>
"busy-omitted",
ResponseFailure::Network(InboundFailure::ConnectionClosed) =>
"connection-closed",
};
metrics.requests_in_failure_total
+1 -1
View File
@@ -19,7 +19,7 @@ parking_lot = "0.10.0"
futures = "0.3.4"
futures-timer = "3.0.1"
rand = "0.7.2"
libp2p = { version = "0.32.2", default-features = false }
libp2p = { version = "0.33.0", default-features = false }
sp-consensus = { version = "0.8.0", path = "../../../primitives/consensus/common" }
sc-consensus = { version = "0.8.0", path = "../../consensus/common" }
sc-client-api = { version = "2.0.0", path = "../../api" }