mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-28 15:37:56 +00:00
Include the network overhead in the request-in-time metric (#5854)
This commit is contained in:
@@ -74,12 +74,12 @@ pub type Error = Box<dyn std::error::Error + 'static>;
|
||||
/// Event generated by the block requests behaviour.
|
||||
#[derive(Debug)]
|
||||
pub enum Event<B: Block> {
|
||||
/// A request came and we answered it.
|
||||
/// A request came and we have successfully answered it.
|
||||
AnsweredRequest {
|
||||
/// Peer which has emitted the request.
|
||||
peer: PeerId,
|
||||
/// Time it took to compute the response.
|
||||
response_build_time: Duration,
|
||||
/// Time elapsed between when we received the request and when we sent back the response.
|
||||
total_handling_time: Duration,
|
||||
},
|
||||
|
||||
/// A response to a block request has arrived.
|
||||
@@ -190,8 +190,9 @@ pub struct BlockRequests<B: Block> {
|
||||
chain: Arc<dyn Client<B>>,
|
||||
/// List of all active connections and the requests we've sent.
|
||||
peers: HashMap<PeerId, Vec<Connection<B>>>,
|
||||
/// Futures sending back the block request response.
|
||||
outgoing: FuturesUnordered<BoxFuture<'static, ()>>,
|
||||
/// Futures sending back the block request response. Returns the `PeerId` we sent back to, and
|
||||
/// the total time the handling of this request took.
|
||||
outgoing: FuturesUnordered<BoxFuture<'static, (PeerId, Duration)>>,
|
||||
/// Events to return as soon as possible from `poll`.
|
||||
pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>,
|
||||
}
|
||||
@@ -533,9 +534,7 @@ where
|
||||
node_event: NodeEvent<B, NegotiatedSubstream>
|
||||
) {
|
||||
match node_event {
|
||||
NodeEvent::Request(request, mut stream) => {
|
||||
let before_answer_build = Instant::now();
|
||||
|
||||
NodeEvent::Request(request, mut stream, handling_start) => {
|
||||
match self.on_block_request(&peer, &request) {
|
||||
Ok(res) => {
|
||||
log::trace!(
|
||||
@@ -551,7 +550,7 @@ where
|
||||
peer, e
|
||||
)
|
||||
} else {
|
||||
let future = async move {
|
||||
self.outgoing.push(async move {
|
||||
if let Err(e) = write_one(&mut stream, data).await {
|
||||
log::debug!(
|
||||
target: "sync",
|
||||
@@ -559,8 +558,8 @@ where
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
self.outgoing.push(future.boxed())
|
||||
(peer, handling_start.elapsed())
|
||||
}.boxed());
|
||||
}
|
||||
}
|
||||
Err(e) => log::debug!(
|
||||
@@ -568,12 +567,6 @@ where
|
||||
"Error handling block request from peer {}: {}", peer, e
|
||||
)
|
||||
}
|
||||
|
||||
let ev = Event::AnsweredRequest {
|
||||
peer: peer.clone(),
|
||||
response_build_time: before_answer_build.elapsed(),
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
NodeEvent::Response(original_request, response) => {
|
||||
log::trace!(
|
||||
@@ -711,7 +704,14 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {}
|
||||
while let Poll::Ready(Some((peer, total_handling_time))) = self.outgoing.poll_next_unpin(cx) {
|
||||
let ev = Event::AnsweredRequest {
|
||||
peer,
|
||||
total_handling_time,
|
||||
};
|
||||
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
@@ -719,8 +719,9 @@ where
|
||||
/// Output type of inbound and outbound substream upgrades.
|
||||
#[derive(Debug)]
|
||||
pub enum NodeEvent<B: Block, T> {
|
||||
/// Incoming request from remote and substream to use for the response.
|
||||
Request(schema::v1::BlockRequest, T),
|
||||
/// Incoming request from remote, substream to use for the response, and when we started
|
||||
/// handling this request.
|
||||
Request(schema::v1::BlockRequest, T, Instant),
|
||||
/// Incoming response from remote.
|
||||
Response(message::BlockRequest<B>, schema::v1::BlockResponse),
|
||||
}
|
||||
@@ -760,11 +761,14 @@ where
|
||||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
|
||||
|
||||
fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
|
||||
// This `Instant` will be passed around until the processing of this request is done.
|
||||
let handling_start = Instant::now();
|
||||
|
||||
let future = async move {
|
||||
let len = self.max_request_len;
|
||||
let vec = read_one(&mut s, len).await?;
|
||||
match schema::v1::BlockRequest::decode(&vec[..]) {
|
||||
Ok(r) => Ok(NodeEvent::Request(r, s)),
|
||||
Ok(r) => Ok(NodeEvent::Request(r, s, handling_start)),
|
||||
Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user