restructure polkadot-node-jaeger (#2642)

This commit is contained in:
Bernhard Schuster
2021-03-19 16:51:16 +01:00
committed by GitHub
parent 59640a38bc
commit ea6294fa79
19 changed files with 658 additions and 498 deletions
@@ -116,7 +116,7 @@ struct RunningTask {
/// Sender for communicating with other subsystems and reporting results.
sender: mpsc::Sender<FromFetchTask>,
/// Prometheues metrics for reporting results.
metrics: Metrics,
@@ -145,8 +145,8 @@ impl FetchTaskConfig {
};
}
let mut span = jaeger::candidate_hash_span(&core.candidate_hash, "availability-distribution");
span.add_stage(jaeger::Stage::AvailabilityDistribution);
let span = jaeger::Span::new(core.candidate_hash, "availability-distribution")
.with_stage(jaeger::Stage::AvailabilityDistribution);
let prepared_running = RunningTask {
session_index: session_info.session_index,
@@ -263,10 +263,9 @@ impl RunningTask {
let mut bad_validators = Vec::new();
let mut label = FAILED;
let mut count: u32 = 0;
let mut _span = self.span.child_builder("fetch-task")
let mut _span = self.span.child("fetch-task")
.with_chunk_index(self.request.index.0)
.with_relay_parent(&self.relay_parent)
.build();
.with_relay_parent(self.relay_parent);
// Try validators in reverse order:
while let Some(validator) = self.group.pop() {
let _try_span = _span.child("try");
@@ -64,11 +64,11 @@ pub async fn answer_request<Context>(
where
Context: SubsystemContext,
{
let mut span = jaeger::candidate_hash_span(&req.payload.candidate_hash, "answer-request");
span.add_stage(jaeger::Stage::AvailabilityDistribution);
let _child_span = span.child_builder("answer-chunk-request")
.with_chunk_index(req.payload.index.0)
.build();
let span = jaeger::Span::new(req.payload.candidate_hash, "answer-request")
.with_stage(jaeger::Stage::AvailabilityDistribution);
let _child_span = span.child("answer-chunk-request")
.with_chunk_index(req.payload.index.0);
let chunk = query_chunk(ctx, req.payload.candidate_hash, req.payload.index).await?;
@@ -665,8 +665,8 @@ async fn handle_recover(
) -> error::Result<()> {
let candidate_hash = receipt.hash();
let mut span = jaeger::candidate_hash_span(&candidate_hash, "availbility-recovery");
span.add_stage(jaeger::Stage::AvailabilityRecovery);
let span = jaeger::Span::new(candidate_hash, "availbility-recovery")
.with_stage(jaeger::Stage::AvailabilityRecovery);
if let Some(result) = state.availability_lru.get(&candidate_hash) {
if let Err(e) = response_sender.send(result.clone()) {
@@ -400,11 +400,10 @@ where
};
let mut _span = job_data.span
.child_builder("msg-received")
.child("msg-received")
.with_peer_id(&origin)
.with_claimed_validator_index(message.signed_availability.validator_index())
.with_stage(jaeger::Stage::BitfieldDistribution)
.build();
.with_stage(jaeger::Stage::BitfieldDistribution);
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
@@ -505,7 +505,7 @@ async fn process_msg(
DistributeCollation(receipt, pov, result_sender) => {
let _span1 = state.span_per_relay_parent
.get(&receipt.descriptor.relay_parent).map(|s| s.child("distributing-collation"));
let _span2 = jaeger::pov_span(&pov, "distributing-collation");
let _span2 = jaeger::Span::new(&pov, "distributing-collation");
match state.collating_on {
Some(id) if receipt.descriptor.para_id != id => {
// If the ParaId of a collation requested to be distributed does not match
@@ -331,9 +331,8 @@ where
from_collator: response_recv.boxed().fuse(),
to_requester: result,
span: state.span_per_relay_parent.get(&relay_parent).map(|s| {
s.child_builder("collation-request")
s.child("collation-request")
.with_para_id(para_id)
.build()
}),
};
@@ -695,7 +694,7 @@ where
);
// Actual sending:
let _span = jaeger::pov_span(&pov, "received-collation");
let _span = jaeger::Span::new(&pov, "received-collation");
let (mut tx, _) = oneshot::channel();
std::mem::swap(&mut tx, &mut (per_req.to_requester));
let result = tx.send((receipt, pov));
@@ -1116,7 +1115,7 @@ mod tests {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError)
) => {
let req = reqs.into_iter().next()
.expect("There should be exactly one request");
.expect("There should be exactly one request");
match req {
Requests::CollationFetching(req) => {
let payload = req.payload;
@@ -1145,7 +1144,7 @@ mod tests {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError)
) => {
let req = reqs.into_iter().next()
.expect("There should be exactly one request");
.expect("There should be exactly one request");
match req {
Requests::CollationFetching(req) => {
let payload = req.payload;
@@ -154,9 +154,8 @@ async fn handle_signal(
let _timer = state.metrics.time_handle_signal();
for (relay_parent, span) in activated {
let _span = span.child_builder("pov-dist")
.with_stage(jaeger::Stage::PoVDistribution)
.build();
let _span = span.child("pov-dist")
.with_stage(jaeger::Stage::PoVDistribution);
match request_validators_ctx(relay_parent, ctx).await {
Ok(vals_rx) => {
@@ -517,10 +517,9 @@ async fn circulate_statement_and_dependents(
None => return,
};
let _span = active_head.span.child_builder("circulate-statement")
.with_candidate(&statement.payload().candidate_hash())
.with_stage(jaeger::Stage::StatementDistribution)
.build();
let _span = active_head.span.child("circulate-statement")
.with_candidate(statement.payload().candidate_hash())
.with_stage(jaeger::Stage::StatementDistribution);
// First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement.
@@ -539,9 +538,8 @@ async fn circulate_statement_and_dependents(
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
for peer in peers_needing_dependents {
if let Some(peer_data) = peers.get_mut(&peer) {
let _span_loop = _span.child_builder("to-peer")
.with_peer_id(&peer)
.build();
let _span_loop = _span.child("to-peer")
.with_peer_id(&peer);
// defensive: the peer data should always be some because the iterator
// of peers is derived from the set of peers.
send_statements_about(
@@ -702,10 +700,9 @@ async fn handle_incoming_message<'a>(
};
let candidate_hash = statement.payload().candidate_hash();
let handle_incoming_span = active_head.span.child_builder("handle-incoming")
.with_candidate(&candidate_hash)
.with_peer_id(&peer)
.build();
let handle_incoming_span = active_head.span.child("handle-incoming")
.with_candidate(candidate_hash)
.with_peer_id(&peer);
// check the signature on the statement.
if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) {