mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 14:37:57 +00:00
Add metrics about block requests (#5811)
* Add metrics about block requests * Apply suggestions from code review Co-Authored-By: Max Inden <mail@max-inden.de> Co-authored-by: Gavin Wood <i@gavwood.com> Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@@ -65,6 +65,7 @@ use std::{
|
||||
task::{Context, Poll}
|
||||
};
|
||||
use void::{Void, unreachable};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
// Type alias for convenience.
|
||||
pub type Error = Box<dyn std::error::Error + 'static>;
|
||||
@@ -72,25 +73,44 @@ pub type Error = Box<dyn std::error::Error + 'static>;
|
||||
/// Event generated by the block requests behaviour.
|
||||
#[derive(Debug)]
|
||||
pub enum Event<B: Block> {
|
||||
/// A request came and we answered it.
|
||||
AnsweredRequest {
|
||||
/// Peer which has emitted the request.
|
||||
peer: PeerId,
|
||||
/// Time it took to compute the response.
|
||||
response_build_time: Duration,
|
||||
},
|
||||
|
||||
/// A response to a block request has arrived.
|
||||
Response {
|
||||
peer: PeerId,
|
||||
/// The original request passed to `send_request`.
|
||||
original_request: message::BlockRequest<B>,
|
||||
response: message::BlockResponse<B>,
|
||||
/// Time elapsed between the start of the request and the response.
|
||||
request_duration: Duration,
|
||||
},
|
||||
|
||||
/// A request has been cancelled because the peer has disconnected.
|
||||
/// Disconnects can also happen as a result of violating the network protocol.
|
||||
///
|
||||
/// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`.
|
||||
/// > For that, you must check the value returned by `send_request`.
|
||||
RequestCancelled {
|
||||
peer: PeerId,
|
||||
/// The original request passed to `send_request`.
|
||||
original_request: message::BlockRequest<B>,
|
||||
/// Time elapsed between the start of the request and the cancellation.
|
||||
request_duration: Duration,
|
||||
},
|
||||
|
||||
/// A request has timed out.
|
||||
RequestTimeout {
|
||||
peer: PeerId,
|
||||
/// The original request passed to `send_request`.
|
||||
original_request: message::BlockRequest<B>,
|
||||
/// Time elapsed between the start of the request and the timeout.
|
||||
request_duration: Duration,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,10 +204,32 @@ struct Connection<B: Block> {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct OngoingRequest<B: Block> {
|
||||
/// `Instant` when the request has been emitted. Used for diagnostic purposes.
|
||||
emitted: Instant,
|
||||
request: message::BlockRequest<B>,
|
||||
timeout: Delay,
|
||||
}
|
||||
|
||||
/// Outcome of calling `send_request`.
|
||||
#[derive(Debug)]
|
||||
#[must_use]
|
||||
pub enum SendRequestOutcome<B: Block> {
|
||||
/// Request has been emitted.
|
||||
Ok,
|
||||
/// The request has been emitted and has replaced an existing request.
|
||||
Replaced {
|
||||
/// The previously-emitted request.
|
||||
previous: message::BlockRequest<B>,
|
||||
/// Time that had elapsed since `previous` has been emitted.
|
||||
request_duration: Duration,
|
||||
},
|
||||
/// Didn't start a request because we have no connection to this node.
|
||||
/// If `send_request` returns that, it is as if the function had never been called.
|
||||
NotConnected,
|
||||
/// Error while serializing the request.
|
||||
EncodeError(prost::EncodeError),
|
||||
}
|
||||
|
||||
impl<B> BlockRequests<B>
|
||||
where
|
||||
B: Block,
|
||||
@@ -202,13 +244,18 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`).
|
||||
pub fn protocol_name(&self) -> &[u8] {
|
||||
&self.config.protocol
|
||||
}
|
||||
|
||||
/// Issue a new block request.
|
||||
///
|
||||
/// Cancels any existing request targeting the same `PeerId`.
|
||||
///
|
||||
/// If the response doesn't arrive in time, or if the remote answers improperly, the target
|
||||
/// will be disconnected.
|
||||
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) {
|
||||
pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) -> SendRequestOutcome<B> {
|
||||
// Determine which connection to send the request to.
|
||||
let connection = if let Some(peer) = self.peers.get_mut(target) {
|
||||
// We don't want to have multiple requests for any given node, so in priority try to
|
||||
@@ -222,10 +269,10 @@ where
|
||||
target: "sync",
|
||||
"State inconsistency: empty list of peer connections"
|
||||
);
|
||||
return;
|
||||
return SendRequestOutcome::NotConnected;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
return SendRequestOutcome::NotConnected;
|
||||
};
|
||||
|
||||
let protobuf_rq = api::v1::BlockRequest {
|
||||
@@ -252,17 +299,12 @@ where
|
||||
protobuf_rq,
|
||||
err
|
||||
);
|
||||
return;
|
||||
return SendRequestOutcome::EncodeError(err);
|
||||
}
|
||||
|
||||
if let Some(rq) = &connection.ongoing_request {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Replacing existing block request on connection {:?}",
|
||||
connection.id
|
||||
);
|
||||
}
|
||||
let previous_request = connection.ongoing_request.take();
|
||||
connection.ongoing_request = Some(OngoingRequest {
|
||||
emitted: Instant::now(),
|
||||
request: req.clone(),
|
||||
timeout: Delay::new(self.config.request_timeout),
|
||||
});
|
||||
@@ -278,6 +320,20 @@ where
|
||||
protocol: self.config.protocol.clone(),
|
||||
},
|
||||
});
|
||||
|
||||
if let Some(previous_request) = previous_request {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
"Replacing existing block request on connection {:?}",
|
||||
connection.id
|
||||
);
|
||||
SendRequestOutcome::Replaced {
|
||||
previous: previous_request.request,
|
||||
request_duration: previous_request.emitted.elapsed(),
|
||||
}
|
||||
} else {
|
||||
SendRequestOutcome::Ok
|
||||
}
|
||||
}
|
||||
|
||||
/// Callback, invoked when a new block request has been received from remote.
|
||||
@@ -445,6 +501,7 @@ where
|
||||
let ev = Event::RequestCancelled {
|
||||
peer: peer_id.clone(),
|
||||
original_request: ongoing_request.request.clone(),
|
||||
request_duration: ongoing_request.emitted.elapsed(),
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
@@ -476,6 +533,8 @@ where
|
||||
) {
|
||||
match node_event {
|
||||
NodeEvent::Request(request, mut stream) => {
|
||||
let before_answer_build = Instant::now();
|
||||
|
||||
match self.on_block_request(&peer, &request) {
|
||||
Ok(res) => {
|
||||
log::trace!(
|
||||
@@ -508,6 +567,12 @@ where
|
||||
"Error handling block request from peer {}: {}", peer, e
|
||||
)
|
||||
}
|
||||
|
||||
let ev = Event::AnsweredRequest {
|
||||
peer: peer.clone(),
|
||||
response_build_time: before_answer_build.elapsed(),
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
NodeEvent::Response(original_request, response) => {
|
||||
log::trace!(
|
||||
@@ -515,11 +580,13 @@ where
|
||||
"Received block response from peer {} with {} blocks",
|
||||
peer, response.blocks.len()
|
||||
);
|
||||
if let Some(connections) = self.peers.get_mut(&peer) {
|
||||
let request_duration = if let Some(connections) = self.peers.get_mut(&peer) {
|
||||
if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
|
||||
if let Some(ongoing_request) = &mut connection.ongoing_request {
|
||||
if ongoing_request.request == original_request {
|
||||
let request_duration = ongoing_request.emitted.elapsed();
|
||||
connection.ongoing_request = None;
|
||||
request_duration
|
||||
} else {
|
||||
// We're no longer interested in that request.
|
||||
log::debug!(
|
||||
@@ -550,7 +617,7 @@ where
|
||||
peer
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let blocks = response.blocks.into_iter().map(|block_data| {
|
||||
Ok(message::BlockData::<B> {
|
||||
@@ -594,6 +661,7 @@ where
|
||||
peer,
|
||||
original_request,
|
||||
response: message::BlockResponse::<B> { id, blocks },
|
||||
request_duration,
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
@@ -625,6 +693,7 @@ where
|
||||
|
||||
if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
|
||||
let original_request = ongoing_request.request.clone();
|
||||
let request_duration = ongoing_request.emitted.elapsed();
|
||||
connection.ongoing_request = None;
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
@@ -634,6 +703,7 @@ where
|
||||
let ev = Event::RequestTimeout {
|
||||
peer: peer.clone(),
|
||||
original_request,
|
||||
request_duration,
|
||||
};
|
||||
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user