Consolidate subsystem spans so they are all children of the leaf-activated root span (#6458)

* Pass the PerLeafSpan as mutable reference to handle_new_head function

* cargo +nightly fmt --all

* Add mock span for test

* cargo +nightly fmt --all

* add new-blocks-hashes to span

* ref span in match statement, set span to disabled if not passed

* remove second match clause, make handle_new_head_span mutable

* cargo +nightly fmt --all

* improve tag on error and warning

* add imported blocks and info span

* cargo +nightly fmt --all

* Improve error for imported_blocks_and_info trace

* format tags on get_header_span

* add lost-to-finality tag

* add missing bracket

* - Add bitfield child span
- Add block db insertion span

* - fix update-bitfield span tag

* - Fix type conversion to u64
- Add missing argument

* - Cargo fmt

* - Test add_follows_from

* - Revert as  relationship between spans not working correctly

* - use drop to test if parent-child relationship can be re-established

* - remove bitfield span, check if parent-child relationship can be reestablished

* - Remove dangling bitfield span which is not used, to see if parent-child relationship can be re-established

* Another dangling bitfield span

* cargo fmt

* - add imported blocks and info span
- add candidate span per candidate

* add tags before moving block_header to push scope

* - Add db-insertion span

* cargo fmt

* fix types

* * Pass mutable reference to span in handle_new_head
* Change get-header-span tags in handle_new_head
* Create cache-session-info span in handle_new_head
* Create optional argument in determine_new_blocks
* Pass mutable reference to handle_new_head_span in determine_new_blocks in handle_new_head function
* Add candidate-hash, candidate-number, lost-to-finality tags to candidate_span in handle_new_head function
* Manually drop db_insertion_span and remove superfluous tags  to it, only keeping approved-bitfields tag
* Add ApprovalVoting stage in jaeger

* * Pass mutable reference to jaeger::Span in stead of PerLeafSpan
* Add block-import span

* *Pass optional_span (optional argument) to determine_new_blocks util function

* * Add num-candidates int tag to block_import_span

* * Add head tag to cache_session_span

* * Create PerLeafSpan in handle_from_overseer (this is required to establish parent-child relationship between approval-voting span, and leaf-activated root span)

* * Add candidate-import-span as child of block-import-span
* Add candidate-hash and num-approval tags to candidate-import-span

* * Fix num-candidate tag to bitvec-len tag in candidate-import-span

* *Fix imported_blocKs_and_info span to create new-block-span as not dealing with candidates

* Consider the future::select! block

* Use HashMap<Hash, jaeger::PerLeafSpan>

* Remove Stage 9

* Add missing spans

* cargo +nightly fmt --all

* Remove optional span argument for determine_new_blocks

* * Remove no-longer needed default PerLeafSpan implementation
* Remove no-longer necessary mock span given re-factoring of handle_new_head() no longer neeing mutable span
* Split validation-result and request-data (availability and validation code) spans into two by dropping request_validation_data_spans
* Remove drop statements for cache_session_info_span
*

* Remove unnecessary span

* Remove another excessively spammy span

* Add missing spans from State in import tests

* Use functional approach to get spans

* - Add functional approach for the approval-voting span
- Add doc on block_numbers given labelling ambiguity
- Add span pruning logic
- Use .add_para_id on validation_result_span

* Replace for hash_set in hash_set_iter with map closure

* cargo +nightly fmt --all

* Change from unconsumed `map` to `.for_each`

* cargo +nightly fmt --all

* Refactor add_para_id to validation_result_span

* cargo +nightly fmt --all

* Remove duplicate tag

* Add missing tag to handle-approved-ancestor span

* Refactor span pruning to only invoke retain once

* Typo in span name

* - Replace unwrap_or with unwrap_or_else due to lazy evaluation of trace-identifier in polkadot_node_jaeger
- Remove some redundant spans

* Add approval-distribution spans

* - Add unwrap_or_else on note-approved-in-chain-selection
- Use child_with_trace_id to add traceID string tag on span (note this does not change the traceID, but just adds a tag)

* cargo +nightly fmt --all

* - Add traceID tags were necessary in approval-voting and availability-distribution
- Always use block-hash tag in stead of relay-parent tag in approval-distribution

* Remove schedule-wakeup span as it will duplicate spans on existing wakeups (which should be a no-op)

* Remove a couple of warnings related to mutability

* Fix failing tests in availability distribution

* Add traceID tag to launch-approval and validation-result

* Reshuffle the validation and validation result spans to where more appropriate and add block-hash tag

* - Add tranche and should-trigger tag to process-wakeup span
- Add candidate-hash and traceID to check-and-import-approval span

* cargo fmt

* - Adjustments after PR comments

* Move span pruning after other pruning logic

* Remove DerefMut - no longer needed

* Relabel request-chunk spans

* - Fix typo in span label
- Add docs for drops

* Add new approval-voting span pruning logic

* Undo removal of !

* cargo fmt
This commit is contained in:
Mattia L.V. Bradascio
2023-03-31 16:54:19 +01:00
committed by GitHub
parent 9fe528d5c7
commit 713f6625fa
12 changed files with 349 additions and 90 deletions
+1
View File
@@ -6559,6 +6559,7 @@ dependencies = [
"env_logger 0.9.0", "env_logger 0.9.0",
"futures", "futures",
"log", "log",
"polkadot-node-jaeger",
"polkadot-node-metrics", "polkadot-node-metrics",
"polkadot-node-network-protocol", "polkadot-node-network-protocol",
"polkadot-node-primitives", "polkadot-node-primitives",
@@ -329,13 +329,17 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
finalized_number: &Option<BlockNumber>, finalized_number: &Option<BlockNumber>,
) -> SubsystemResult<Vec<BlockImportedCandidates>> { ) -> SubsystemResult<Vec<BlockImportedCandidates>> {
const MAX_HEADS_LOOK_BACK: BlockNumber = MAX_FINALITY_LAG; const MAX_HEADS_LOOK_BACK: BlockNumber = MAX_FINALITY_LAG;
let _handle_new_head_span = state
let mut span = jaeger::Span::new(head, "approval-checking-import"); .spans
.get(&head)
.map(|span| span.child("handle-new-head"))
.unwrap_or_else(|| jaeger::Span::new(head, "handle-new-head"))
.with_string_tag("head", format!("{:?}", head))
.with_stage(jaeger::Stage::ApprovalChecking);
let header = { let header = {
let (h_tx, h_rx) = oneshot::channel(); let (h_tx, h_rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await;
match h_rx.await? { match h_rx.await? {
Err(e) => { Err(e) => {
gum::debug!( gum::debug!(
@@ -343,11 +347,12 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
"Chain API subsystem temporarily unreachable {}", "Chain API subsystem temporarily unreachable {}",
e, e,
); );
// May be a better way of handling errors here.
return Ok(Vec::new()) return Ok(Vec::new())
}, },
Ok(None) => { Ok(None) => {
gum::warn!(target: LOG_TARGET, "Missing header for new head {}", head); gum::warn!(target: LOG_TARGET, "Missing header for new head {}", head);
// May be a better way of handling warnings here.
return Ok(Vec::new()) return Ok(Vec::new())
}, },
Ok(Some(h)) => h, Ok(Some(h)) => h,
@@ -363,7 +368,6 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
?e, ?e,
"Could not cache session info when processing head.", "Could not cache session info when processing head.",
); );
return Ok(Vec::new()) return Ok(Vec::new())
}, },
Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => { Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => {
@@ -391,8 +395,6 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
.map_err(|e| SubsystemError::with_origin("approval-voting", e)) .map_err(|e| SubsystemError::with_origin("approval-voting", e))
.await?; .await?;
span.add_uint_tag("new-blocks", new_blocks.len() as u64);
if new_blocks.is_empty() { if new_blocks.is_empty() {
return Ok(Vec::new()) return Ok(Vec::new())
} }
@@ -473,6 +475,7 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
); );
(block_tick, no_show_duration) (block_tick, no_show_duration)
}; };
let needed_approvals = session_info.needed_approvals; let needed_approvals = session_info.needed_approvals;
let validator_group_lens: Vec<usize> = let validator_group_lens: Vec<usize> =
session_info.validator_groups.iter().map(|v| v.len()).collect(); session_info.validator_groups.iter().map(|v| v.len()).collect();
@@ -507,11 +510,9 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
result.len(), result.len(),
); );
} }
result result
} }
}; };
// If all bits are already set, then send an approve message. // If all bits are already set, then send an approve message.
if approved_bitfield.count_ones() == approved_bitfield.len() { if approved_bitfield.count_ones() == approved_bitfield.len() {
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
@@ -602,7 +603,6 @@ pub(crate) async fn handle_new_head<Context, B: Backend>(
); );
ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta));
Ok(imported_candidates) Ok(imported_candidates)
} }
@@ -661,6 +661,7 @@ pub(crate) mod tests {
assignment_criteria: Box::new(MockAssignmentCriteria), assignment_criteria: Box::new(MockAssignmentCriteria),
db, db,
db_config: TEST_CONFIG, db_config: TEST_CONFIG,
spans: HashMap::new(),
} }
} }
+161 -37
View File
@@ -21,6 +21,7 @@
//! of others. It uses this information to determine when candidates and blocks have //! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize. //! been sufficiently approved to finalize.
use jaeger::{hash_to_trace_identifier, PerLeafSpan};
use polkadot_node_jaeger as jaeger; use polkadot_node_jaeger as jaeger;
use polkadot_node_primitives::{ use polkadot_node_primitives::{
approval::{ approval::{
@@ -478,7 +479,11 @@ impl Wakeups {
self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash)); self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash));
} }
fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) { fn prune_finalized_wakeups(
&mut self,
finalized_number: BlockNumber,
spans: &mut HashMap<Hash, PerLeafSpan>,
) {
let after = self.block_numbers.split_off(&(finalized_number + 1)); let after = self.block_numbers.split_off(&(finalized_number + 1));
let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after) let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after)
.into_iter() .into_iter()
@@ -502,6 +507,9 @@ impl Wakeups {
} }
} }
} }
// Remove all spans that are associated with pruned blocks.
spans.retain(|h, _| !pruned_blocks.contains(h));
} }
// Get the wakeup for a particular block/candidate combo, if any. // Get the wakeup for a particular block/candidate combo, if any.
@@ -639,6 +647,7 @@ struct State {
// Require for `RollingSessionWindow`. // Require for `RollingSessionWindow`.
db_config: DatabaseConfig, db_config: DatabaseConfig,
db: Arc<dyn Database>, db: Arc<dyn Database>,
spans: HashMap<Hash, jaeger::PerLeafSpan>,
} }
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
@@ -777,6 +786,7 @@ where
assignment_criteria, assignment_criteria,
db_config: subsystem.db_config, db_config: subsystem.db_config,
db: subsystem.db, db: subsystem.db,
spans: HashMap::new(),
}; };
let mut wakeups = Wakeups::default(); let mut wakeups = Wakeups::default();
@@ -798,14 +808,13 @@ where
loop { loop {
let mut overlayed_db = OverlayedBackend::new(&backend); let mut overlayed_db = OverlayedBackend::new(&backend);
let actions = futures::select! { let actions = futures::select! {
(tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => {
subsystem.metrics.on_wakeup(); subsystem.metrics.on_wakeup();
process_wakeup( process_wakeup(
&mut state, &mut state,
&mut overlayed_db, &mut overlayed_db,
woken_block, woken_block,
woken_candidate, woken_candidate,
tick,
&subsystem.metrics, &subsystem.metrics,
)? )?
} }
@@ -878,7 +887,6 @@ where
if !overlayed_db.is_empty() { if !overlayed_db.is_empty() {
let _timer = subsystem.metrics.time_db_transaction(); let _timer = subsystem.metrics.time_db_transaction();
let ops = overlayed_db.into_write_ops(); let ops = overlayed_db.into_write_ops();
backend.write(ops)?; backend.write(ops)?;
} }
@@ -919,12 +927,12 @@ async fn handle_actions<Context>(
actions: Vec<Action>, actions: Vec<Action>,
) -> SubsystemResult<bool> { ) -> SubsystemResult<bool> {
let mut conclude = false; let mut conclude = false;
let mut actions_iter = actions.into_iter(); let mut actions_iter = actions.into_iter();
while let Some(action) = actions_iter.next() { while let Some(action) = actions_iter.next() {
match action { match action {
Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => {
wakeups.schedule(block_hash, block_number, candidate_hash, tick), wakeups.schedule(block_hash, block_number, candidate_hash, tick);
},
Action::IssueApproval(candidate_hash, approval_request) => { Action::IssueApproval(candidate_hash, approval_request) => {
// Note that the IssueApproval action will create additional // Note that the IssueApproval action will create additional
// actions that will need to all be processed before we can // actions that will need to all be processed before we can
@@ -968,8 +976,18 @@ async fn handle_actions<Context>(
continue continue
} }
let mut launch_approval_span = state
.spans
.get(&relay_block_hash)
.map(|span| span.child("launch-approval"))
.unwrap_or_else(|| jaeger::Span::new(candidate_hash, "launch-approval"))
.with_trace_id(candidate_hash)
.with_candidate(candidate_hash)
.with_stage(jaeger::Stage::ApprovalChecking);
metrics.on_assignment_produced(assignment_tranche); metrics.on_assignment_produced(assignment_tranche);
let block_hash = indirect_cert.block_hash; let block_hash = indirect_cert.block_hash;
launch_approval_span.add_string_tag("block-hash", format!("{:?}", block_hash));
let validator_index = indirect_cert.validator; let validator_index = indirect_cert.validator;
ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment(
@@ -1004,6 +1022,7 @@ async fn handle_actions<Context>(
validator_index, validator_index,
block_hash, block_hash,
backing_group, backing_group,
&launch_approval_span,
) )
.await .await
}, },
@@ -1014,12 +1033,21 @@ async fn handle_actions<Context>(
} }
}, },
Action::NoteApprovedInChainSelection(block_hash) => { Action::NoteApprovedInChainSelection(block_hash) => {
let _span = state
.spans
.get(&block_hash)
.map(|span| span.child("note-approved-in-chain-selection"))
.unwrap_or_else(|| {
jaeger::Span::new(block_hash, "note-approved-in-chain-selection")
})
.with_string_tag("block-hash", format!("{:?}", block_hash))
.with_stage(jaeger::Stage::ApprovalChecking);
ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await;
}, },
Action::BecomeActive => { Action::BecomeActive => {
*mode = Mode::Active; *mode = Mode::Active;
let messages = distribution_messages_for_activation(overlayed_db)?; let messages = distribution_messages_for_activation(overlayed_db, state)?;
ctx.send_messages(messages.into_iter()).await; ctx.send_messages(messages.into_iter()).await;
}, },
@@ -1034,6 +1062,7 @@ async fn handle_actions<Context>(
fn distribution_messages_for_activation( fn distribution_messages_for_activation(
db: &OverlayedBackend<'_, impl Backend>, db: &OverlayedBackend<'_, impl Backend>,
state: &mut State,
) -> SubsystemResult<Vec<ApprovalDistributionMessage>> { ) -> SubsystemResult<Vec<ApprovalDistributionMessage>> {
let all_blocks: Vec<Hash> = db.load_all_blocks()?; let all_blocks: Vec<Hash> = db.load_all_blocks()?;
@@ -1043,6 +1072,15 @@ fn distribution_messages_for_activation(
messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value. messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
for block_hash in all_blocks { for block_hash in all_blocks {
let mut distribution_message_span = state
.spans
.get(&block_hash)
.map(|span| span.child("distribution-messages-for-activation"))
.unwrap_or_else(|| {
jaeger::Span::new(block_hash, "distribution-messages-for-activation")
})
.with_stage(jaeger::Stage::ApprovalChecking)
.with_string_tag("block-hash", format!("{:?}", block_hash));
let block_entry = match db.load_block_entry(&block_hash)? { let block_entry = match db.load_block_entry(&block_hash)? {
Some(b) => b, Some(b) => b,
None => { None => {
@@ -1051,6 +1089,10 @@ fn distribution_messages_for_activation(
continue continue
}, },
}; };
distribution_message_span.add_string_tag("block-hash", &block_hash.to_string());
distribution_message_span
.add_string_tag("parent-hash", &block_entry.parent_hash().to_string());
approval_meta.push(BlockApprovalMeta { approval_meta.push(BlockApprovalMeta {
hash: block_hash, hash: block_hash,
number: block_entry.block_number(), number: block_entry.block_number(),
@@ -1061,6 +1103,8 @@ fn distribution_messages_for_activation(
}); });
for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() { for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() {
let _candidate_span =
distribution_message_span.child("candidate").with_candidate(*candidate_hash);
let candidate_entry = match db.load_candidate_entry(&candidate_hash)? { let candidate_entry = match db.load_candidate_entry(&candidate_hash)? {
Some(c) => c, Some(c) => c,
None => { None => {
@@ -1140,9 +1184,11 @@ async fn handle_from_overseer<Context>(
let actions = match x { let actions = match x {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
let mut actions = Vec::new(); let mut actions = Vec::new();
if let Some(activated) = update.activated { if let Some(activated) = update.activated {
let head = activated.hash; let head = activated.hash;
let approval_voting_span =
jaeger::PerLeafSpan::new(activated.span, "approval-voting");
state.spans.insert(head, approval_voting_span);
match import::handle_new_head(ctx, state, db, head, &*last_finalized_height).await { match import::handle_new_head(ctx, state, db, head, &*last_finalized_height).await {
Err(e) => return Err(SubsystemError::with_origin("db", e)), Err(e) => return Err(SubsystemError::with_origin("db", e)),
Ok(block_imported_candidates) => { Ok(block_imported_candidates) => {
@@ -1199,7 +1245,12 @@ async fn handle_from_overseer<Context>(
crate::ops::canonicalize(db, block_number, block_hash) crate::ops::canonicalize(db, block_number, block_hash)
.map_err(|e| SubsystemError::with_origin("db", e))?; .map_err(|e| SubsystemError::with_origin("db", e))?;
wakeups.prune_finalized_wakeups(block_number); // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans accordingly.
wakeups.prune_finalized_wakeups(block_number, &mut state.spans);
// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans accordingly.
// let hash_set = wakeups.block_numbers.values().flatten().collect::<HashSet<_>>();
// state.spans.retain(|hash, _| hash_set.contains(hash));
Vec::new() Vec::new()
}, },
@@ -1220,7 +1271,23 @@ async fn handle_from_overseer<Context>(
})? })?
.0, .0,
ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => { ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => {
match handle_approved_ancestor(ctx, db, target, lower_bound, wakeups).await { let mut approved_ancestor_span = state
.spans
.get(&target)
.map(|span| span.child("approved-ancestor"))
.unwrap_or_else(|| jaeger::Span::new(target, "approved-ancestor"))
.with_stage(jaeger::Stage::ApprovalChecking)
.with_string_tag("leaf", format!("{:?}", target));
match handle_approved_ancestor(
ctx,
db,
target,
lower_bound,
wakeups,
&mut approved_ancestor_span,
)
.await
{
Ok(v) => { Ok(v) => {
let _ = res.send(v); let _ = res.send(v);
}, },
@@ -1342,15 +1409,15 @@ async fn handle_approved_ancestor<Context>(
target: Hash, target: Hash,
lower_bound: BlockNumber, lower_bound: BlockNumber,
wakeups: &Wakeups, wakeups: &Wakeups,
span: &mut jaeger::Span,
) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> { ) -> SubsystemResult<Option<HighestApprovedAncestorBlock>> {
const MAX_TRACING_WINDOW: usize = 200; const MAX_TRACING_WINDOW: usize = 200;
const ABNORMAL_DEPTH_THRESHOLD: usize = 5; const ABNORMAL_DEPTH_THRESHOLD: usize = 5;
let mut span = span
.child("handle-approved-ancestor")
.with_stage(jaeger::Stage::ApprovalChecking);
use bitvec::{order::Lsb0, vec::BitVec}; use bitvec::{order::Lsb0, vec::BitVec};
let mut span =
jaeger::Span::new(&target, "approved-ancestor").with_stage(jaeger::Stage::ApprovalChecking);
let mut all_approved_max = None; let mut all_approved_max = None;
let target_number = { let target_number = {
@@ -1365,13 +1432,12 @@ async fn handle_approved_ancestor<Context>(
} }
}; };
span.add_uint_tag("leaf-number", target_number as u64);
span.add_uint_tag("lower-bound", lower_bound as u64);
if target_number <= lower_bound { if target_number <= lower_bound {
return Ok(None) return Ok(None)
} }
span.add_string_fmt_debug_tag("target-number", target_number);
span.add_string_fmt_debug_tag("target-hash", target);
// request ancestors up to but not including the lower bound, // request ancestors up to but not including the lower bound,
// as a vote on the lower bound is implied if we cannot find // as a vote on the lower bound is implied if we cannot find
// anything else. // anything else.
@@ -1397,6 +1463,9 @@ async fn handle_approved_ancestor<Context>(
let mut bits: BitVec<u8, Lsb0> = Default::default(); let mut bits: BitVec<u8, Lsb0> = Default::default();
for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() { for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
let mut entry_span =
span.child("load-block-entry").with_stage(jaeger::Stage::ApprovalChecking);
entry_span.add_string_tag("block-hash", format!("{:?}", block_hash));
// Block entries should be present as the assumption is that // Block entries should be present as the assumption is that
// nothing here is finalized. If we encounter any missing block // nothing here is finalized. If we encounter any missing block
// entries we can fail. // entries we can fail.
@@ -1452,7 +1521,7 @@ async fn handle_approved_ancestor<Context>(
unapproved.len(), unapproved.len(),
entry.candidates().len(), entry.candidates().len(),
); );
entry_span.add_uint_tag("unapproved-candidates", unapproved.len() as u64);
for candidate_hash in unapproved { for candidate_hash in unapproved {
match db.load_candidate_entry(&candidate_hash)? { match db.load_candidate_entry(&candidate_hash)? {
None => { None => {
@@ -1575,8 +1644,8 @@ async fn handle_approved_ancestor<Context>(
}); });
match all_approved_max { match all_approved_max {
Some(HighestApprovedAncestorBlock { ref hash, ref number, .. }) => { Some(HighestApprovedAncestorBlock { ref hash, ref number, .. }) => {
span.add_uint_tag("approved-number", *number as u64); span.add_uint_tag("highest-approved-number", *number as u64);
span.add_string_fmt_debug_tag("approved-hash", hash); span.add_string_fmt_debug_tag("highest-approved-hash", hash);
}, },
None => { None => {
span.add_string_tag("reached-lower-bound", "true"); span.add_string_tag("reached-lower-bound", "true");
@@ -1677,6 +1746,15 @@ fn check_and_import_assignment(
) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)> { ) -> SubsystemResult<(AssignmentCheckResult, Vec<Action>)> {
let tick_now = state.clock.tick_now(); let tick_now = state.clock.tick_now();
let mut check_and_import_assignment_span = state
.spans
.get(&assignment.block_hash)
.map(|span| span.child("check-and-import-assignment"))
.unwrap_or_else(|| jaeger::Span::new(assignment.block_hash, "check-and-import-assignment"))
.with_relay_parent(assignment.block_hash)
.with_uint_tag("candidate-index", candidate_index as u64)
.with_stage(jaeger::Stage::ApprovalChecking);
let block_entry = match db.load_block_entry(&assignment.block_hash)? { let block_entry = match db.load_block_entry(&assignment.block_hash)? {
Some(b) => b, Some(b) => b,
None => None =>
@@ -1711,6 +1789,13 @@ fn check_and_import_assignment(
)), // no candidate at core. )), // no candidate at core.
}; };
check_and_import_assignment_span
.add_string_tag("candidate-hash", format!("{:?}", assigned_candidate_hash));
check_and_import_assignment_span.add_string_tag(
"traceID",
format!("{:?}", jaeger::hash_to_trace_identifier(assigned_candidate_hash.0)),
);
let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? { let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? {
Some(c) => c, Some(c) => c,
None => None =>
@@ -1769,6 +1854,8 @@ fn check_and_import_assignment(
}, },
}; };
check_and_import_assignment_span.add_uint_tag("tranche", tranche as u64);
let is_duplicate = approval_entry.is_assigned(assignment.validator); let is_duplicate = approval_entry.is_assigned(assignment.validator);
approval_entry.import_assignment(tranche, assignment.validator, tick_now); approval_entry.import_assignment(tranche, assignment.validator, tick_now);
@@ -1822,6 +1909,15 @@ fn check_and_import_approval<T>(
}}; }};
} }
let mut span = state
.spans
.get(&approval.block_hash)
.map(|span| span.child("check-and-import-approval"))
.unwrap_or_else(|| jaeger::Span::new(approval.block_hash, "check-and-import-approval"))
.with_uint_tag("candidate-index", approval.candidate_index as u64)
.with_relay_parent(approval.block_hash)
.with_stage(jaeger::Stage::ApprovalChecking);
let block_entry = match db.load_block_entry(&approval.block_hash)? { let block_entry = match db.load_block_entry(&approval.block_hash)? {
Some(b) => b, Some(b) => b,
None => { None => {
@@ -1847,6 +1943,12 @@ fn check_and_import_approval<T>(
)), )),
}; };
span.add_string_tag("candidate-hash", format!("{:?}", approved_candidate_hash));
span.add_string_tag(
"traceID",
format!("{:?}", hash_to_trace_identifier(approved_candidate_hash.0)),
);
let pubkey = match session_info.validators.get(approval.validator) { let pubkey = match session_info.validators.get(approval.validator) {
Some(k) => k, Some(k) => k,
None => respond_early!(ApprovalCheckResult::Bad( None => respond_early!(ApprovalCheckResult::Bad(
@@ -2120,13 +2222,14 @@ fn process_wakeup(
db: &mut OverlayedBackend<'_, impl Backend>, db: &mut OverlayedBackend<'_, impl Backend>,
relay_block: Hash, relay_block: Hash,
candidate_hash: CandidateHash, candidate_hash: CandidateHash,
expected_tick: Tick,
metrics: &Metrics, metrics: &Metrics,
) -> SubsystemResult<Vec<Action>> { ) -> SubsystemResult<Vec<Action>> {
let _span = jaeger::Span::from_encodable( let mut span = state
(relay_block, candidate_hash, expected_tick), .spans
"process-approval-wakeup", .get(&relay_block)
) .map(|span| span.child("process-wakeup"))
.unwrap_or_else(|| jaeger::Span::new(candidate_hash, "process-wakeup"))
.with_trace_id(candidate_hash)
.with_relay_parent(relay_block) .with_relay_parent(relay_block)
.with_candidate(candidate_hash) .with_candidate(candidate_hash)
.with_stage(jaeger::Stage::ApprovalChecking); .with_stage(jaeger::Stage::ApprovalChecking);
@@ -2159,9 +2262,8 @@ fn process_wakeup(
state.slot_duration_millis, state.slot_duration_millis,
Slot::from(u64::from(session_info.no_show_slots)), Slot::from(u64::from(session_info.no_show_slots)),
); );
let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot()); let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot());
span.add_uint_tag("tranche", tranche_now as u64);
gum::trace!( gum::trace!(
target: LOG_TARGET, target: LOG_TARGET,
tranche = tranche_now, tranche = tranche_now,
@@ -2195,6 +2297,8 @@ fn process_wakeup(
(should_trigger, approval_entry.backing_group()) (should_trigger, approval_entry.backing_group())
}; };
gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger);
let mut actions = Vec::new(); let mut actions = Vec::new();
let candidate_receipt = candidate_entry.candidate_receipt().clone(); let candidate_receipt = candidate_entry.candidate_receipt().clone();
@@ -2243,7 +2347,6 @@ fn process_wakeup(
}); });
} }
} }
// Although we checked approval earlier in this function, // Although we checked approval earlier in this function,
// this wakeup might have advanced the state to approved via // this wakeup might have advanced the state to approved via
// a no-show that was immediately covered and therefore // a no-show that was immediately covered and therefore
@@ -2275,6 +2378,7 @@ async fn launch_approval<Context>(
validator_index: ValidatorIndex, validator_index: ValidatorIndex,
block_hash: Hash, block_hash: Hash,
backing_group: GroupIndex, backing_group: GroupIndex,
span: &jaeger::Span,
) -> SubsystemResult<RemoteHandle<ApprovalState>> { ) -> SubsystemResult<RemoteHandle<ApprovalState>> {
let (a_tx, a_rx) = oneshot::channel(); let (a_tx, a_rx) = oneshot::channel();
let (code_tx, code_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel();
@@ -2306,9 +2410,15 @@ async fn launch_approval<Context>(
let candidate_hash = candidate.hash(); let candidate_hash = candidate.hash();
let para_id = candidate.descriptor.para_id; let para_id = candidate.descriptor.para_id;
gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data."); gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");
let request_validation_data_span = span
.child("request-validation-data")
.with_trace_id(candidate_hash)
.with_candidate(candidate_hash)
.with_string_tag("block-hash", format!("{:?}", block_hash))
.with_stage(jaeger::Stage::ApprovalChecking);
let timer = metrics.time_recover_and_approve(); let timer = metrics.time_recover_and_approve();
ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData( ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData(
candidate.clone(), candidate.clone(),
@@ -2318,6 +2428,13 @@ async fn launch_approval<Context>(
)) ))
.await; .await;
let request_validation_result_span = span
.child("request-validation-result")
.with_trace_id(candidate_hash)
.with_candidate(candidate_hash)
.with_string_tag("block-hash", format!("{:?}", block_hash))
.with_stage(jaeger::Stage::ApprovalChecking);
ctx.send_message(RuntimeApiMessage::Request( ctx.send_message(RuntimeApiMessage::Request(
block_hash, block_hash,
RuntimeApiRequest::ValidationCodeByHash(candidate.descriptor.validation_code_hash, code_tx), RuntimeApiRequest::ValidationCodeByHash(candidate.descriptor.validation_code_hash, code_tx),
@@ -2330,10 +2447,6 @@ async fn launch_approval<Context>(
let background = async move { let background = async move {
// Force the move of the timer into the background task. // Force the move of the timer into the background task.
let _timer = timer; let _timer = timer;
let _span = jaeger::Span::from_encodable((block_hash, candidate_hash), "launch-approval")
.with_relay_parent(block_hash)
.with_candidate(candidate_hash)
.with_stage(jaeger::Stage::ApprovalChecking);
let available_data = match a_rx.await { let available_data = match a_rx.await {
Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
@@ -2371,6 +2484,7 @@ async fn launch_approval<Context>(
return ApprovalState::failed(validator_index, candidate_hash) return ApprovalState::failed(validator_index, candidate_hash)
}, },
}; };
drop(request_validation_data_span);
let validation_code = match code_rx.await { let validation_code = match code_rx.await {
Err(_) => return ApprovalState::failed(validator_index, candidate_hash), Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
@@ -2392,7 +2506,6 @@ async fn launch_approval<Context>(
}; };
let (val_tx, val_rx) = oneshot::channel(); let (val_tx, val_rx) = oneshot::channel();
sender sender
.send_message(CandidateValidationMessage::ValidateFromExhaustive( .send_message(CandidateValidationMessage::ValidateFromExhaustive(
available_data.validation_data, available_data.validation_data,
@@ -2430,7 +2543,6 @@ async fn launch_approval<Context>(
candidate_hash, candidate_hash,
candidate.clone(), candidate.clone(),
); );
metrics_guard.take().on_approval_invalid(); metrics_guard.take().on_approval_invalid();
return ApprovalState::failed(validator_index, candidate_hash) return ApprovalState::failed(validator_index, candidate_hash)
}, },
@@ -2443,11 +2555,11 @@ async fn launch_approval<Context>(
"Failed to validate candidate due to internal error", "Failed to validate candidate due to internal error",
); );
metrics_guard.take().on_approval_error(); metrics_guard.take().on_approval_error();
drop(request_validation_result_span);
return ApprovalState::failed(validator_index, candidate_hash) return ApprovalState::failed(validator_index, candidate_hash)
}, },
} }
}; };
let (background, remote_handle) = background.remote_handle(); let (background, remote_handle) = background.remote_handle();
ctx.spawn("approval-checks", Box::pin(background)).map(move |()| remote_handle) ctx.spawn("approval-checks", Box::pin(background)).map(move |()| remote_handle)
} }
@@ -2463,6 +2575,17 @@ async fn issue_approval<Context>(
candidate_hash: CandidateHash, candidate_hash: CandidateHash,
ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest,
) -> SubsystemResult<Vec<Action>> { ) -> SubsystemResult<Vec<Action>> {
let mut issue_approval_span = state
.spans
.get(&block_hash)
.map(|span| span.child("issue-approval"))
.unwrap_or_else(|| jaeger::Span::new(block_hash, "issue-approval"))
.with_trace_id(candidate_hash)
.with_string_tag("block-hash", format!("{:?}", block_hash))
.with_candidate(candidate_hash)
.with_validator_index(validator_index)
.with_stage(jaeger::Stage::ApprovalChecking);
let block_entry = match db.load_block_entry(&block_hash)? { let block_entry = match db.load_block_entry(&block_hash)? {
Some(b) => b, Some(b) => b,
None => { None => {
@@ -2487,6 +2610,7 @@ async fn issue_approval<Context>(
}, },
Some(idx) => idx, Some(idx) => idx,
}; };
issue_approval_span.add_int_tag("candidate_index", candidate_index as i64);
let session_info = match state.session_info(block_entry.session()) { let session_info = match state.session_info(block_entry.session()) {
Some(s) => s, Some(s) => s,
+8
View File
@@ -149,6 +149,7 @@ pub enum Stage {
AvailabilityRecovery = 6, AvailabilityRecovery = 6,
BitfieldDistribution = 7, BitfieldDistribution = 7,
ApprovalChecking = 8, ApprovalChecking = 8,
ApprovalDistribution = 9,
// Expand as needed, numbers should be ascending according to the stage // Expand as needed, numbers should be ascending according to the stage
// through the inclusion pipeline, or according to the descriptions // through the inclusion pipeline, or according to the descriptions
// in [the path of a para chain block] // in [the path of a para chain block]
@@ -283,6 +284,13 @@ impl Span {
} }
} }
/// Attach a 'traceID' tag set to the decimal representation of the candidate hash.
#[inline(always)]
pub fn with_trace_id(mut self, candidate_hash: CandidateHash) -> Self {
self.add_string_tag("traceID", hash_to_trace_identifier(candidate_hash.0));
self
}
#[inline(always)] #[inline(always)]
pub fn with_string_tag<V: ToString>(mut self, tag: &'static str, val: V) -> Self { pub fn with_string_tag<V: ToString>(mut self, tag: &'static str, val: V) -> Self {
self.add_string_tag::<V>(tag, val); self.add_string_tag::<V>(tag, val);
@@ -10,6 +10,7 @@ polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-primitives = { path = "../../primitives" } polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-primitives = { path = "../../../primitives" } polkadot-primitives = { path = "../../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
rand = "0.8" rand = "0.8"
futures = "0.3.21" futures = "0.3.21"
@@ -21,6 +21,7 @@
#![warn(missing_docs)] #![warn(missing_docs)]
use futures::{channel::oneshot, FutureExt as _}; use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_jaeger as jaeger;
use polkadot_node_network_protocol::{ use polkadot_node_network_protocol::{
self as net_protocol, self as net_protocol,
grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology}, grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
@@ -35,7 +36,7 @@ use polkadot_node_subsystem::{
ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage,
AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeTxMessage, AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeTxMessage,
}, },
overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
}; };
use polkadot_primitives::{ use polkadot_primitives::{
BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature, BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature,
@@ -180,6 +181,9 @@ struct State {
/// Config for aggression. /// Config for aggression.
aggression_config: AggressionConfig, aggression_config: AggressionConfig,
/// HashMap from active leaves to spans
spans: HashMap<Hash, jaeger::PerLeafSpan>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -390,9 +394,18 @@ impl State {
) { ) {
let mut new_hashes = HashSet::new(); let mut new_hashes = HashSet::new();
for meta in &metas { for meta in &metas {
let mut span = self
.spans
.get(&meta.hash)
.map(|span| span.child(&"handle-new-blocks"))
.unwrap_or_else(|| jaeger::Span::new(meta.hash, &"handle-new-blocks"))
.with_string_tag("block-hash", format!("{:?}", meta.hash))
.with_stage(jaeger::Stage::ApprovalDistribution);
match self.blocks.entry(meta.hash) { match self.blocks.entry(meta.hash) {
hash_map::Entry::Vacant(entry) => { hash_map::Entry::Vacant(entry) => {
let candidates_count = meta.candidates.len(); let candidates_count = meta.candidates.len();
span.add_uint_tag("candidates-count", candidates_count as u64);
let mut candidates = Vec::with_capacity(candidates_count); let mut candidates = Vec::with_capacity(candidates_count);
candidates.resize_with(candidates_count, Default::default); candidates.resize_with(candidates_count, Default::default);
@@ -690,6 +703,7 @@ impl State {
if let Some(block_entry) = self.blocks.remove(relay_block) { if let Some(block_entry) = self.blocks.remove(relay_block) {
self.topologies.dec_session_refs(block_entry.session); self.topologies.dec_session_refs(block_entry.session);
} }
self.spans.remove(&relay_block);
}); });
// If a block was finalized, this means we may need to move our aggression // If a block was finalized, this means we may need to move our aggression
@@ -1230,6 +1244,14 @@ impl State {
) -> HashMap<ValidatorIndex, ValidatorSignature> { ) -> HashMap<ValidatorIndex, ValidatorSignature> {
let mut all_sigs = HashMap::new(); let mut all_sigs = HashMap::new();
for (hash, index) in indices { for (hash, index) in indices {
let _span = self
.spans
.get(&hash)
.map(|span| span.child("get-approval-signatures"))
.unwrap_or_else(|| jaeger::Span::new(&hash, "get-approval-signatures"))
.with_string_tag("block-hash", format!("{:?}", hash))
.with_stage(jaeger::Stage::ApprovalDistribution);
let block_entry = match self.blocks.get(&hash) { let block_entry = match self.blocks.get(&hash) {
None => { None => {
gum::debug!( gum::debug!(
@@ -1650,13 +1672,18 @@ impl ApprovalDistribution {
match message { match message {
FromOrchestra::Communication { msg } => FromOrchestra::Communication { msg } =>
Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await, Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await,
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
..
})) => {
gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)"); gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
// the relay chain blocks relevant to the approval subsystems // the relay chain blocks relevant to the approval subsystems
// are those that are available, but not finalized yet // are those that are available, but not finalized yet
// actived and deactivated heads hence are irrelevant to this subsystem // actived and deactivated heads hence are irrelevant to this subsystem, other than
// for tracing purposes.
if let Some(activated) = update.activated {
let head = activated.hash;
let approval_distribution_span =
jaeger::PerLeafSpan::new(activated.span, "approval-distribution");
state.spans.insert(head, approval_distribution_span);
}
}, },
FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
@@ -1682,6 +1709,14 @@ impl ApprovalDistribution {
state.handle_new_blocks(ctx, metrics, metas, rng).await; state.handle_new_blocks(ctx, metrics, metas, rng).await;
}, },
ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) => { ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) => {
let _span = state
.spans
.get(&cert.block_hash)
.map(|span| span.child("import-and-distribute-assignment"))
.unwrap_or_else(|| jaeger::Span::new(&cert.block_hash, "distribute-assignment"))
.with_string_tag("block-hash", format!("{:?}", cert.block_hash))
.with_stage(jaeger::Stage::ApprovalDistribution);
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
"Distributing our assignment on candidate (block={}, index={})", "Distributing our assignment on candidate (block={}, index={})",
@@ -1701,6 +1736,14 @@ impl ApprovalDistribution {
.await; .await;
}, },
ApprovalDistributionMessage::DistributeApproval(vote) => { ApprovalDistributionMessage::DistributeApproval(vote) => {
let _span = state
.spans
.get(&vote.block_hash)
.map(|span| span.child("import-and-distribute-approval"))
.unwrap_or_else(|| jaeger::Span::new(&vote.block_hash, "distribute-approval"))
.with_string_tag("block-hash", format!("{:?}", vote.block_hash))
.with_stage(jaeger::Stage::ApprovalDistribution);
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
"Distributing our approval vote on candidate (block={}, index={})", "Distributing our approval vote on candidate (block={}, index={})",
@@ -20,9 +20,11 @@ use sp_keystore::KeystorePtr;
use polkadot_node_network_protocol::request_response::{v1, IncomingRequestReceiver}; use polkadot_node_network_protocol::request_response::{v1, IncomingRequestReceiver};
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::AvailabilityDistributionMessage, overseer, FromOrchestra, OverseerSignal, jaeger, messages::AvailabilityDistributionMessage, overseer, FromOrchestra, OverseerSignal,
SpawnedSubsystem, SubsystemError, SpawnedSubsystem, SubsystemError,
}; };
use polkadot_primitives::Hash;
use std::collections::HashMap;
/// Error and [`Result`] type for this subsystem. /// Error and [`Result`] type for this subsystem.
mod error; mod error;
@@ -91,6 +93,7 @@ impl AvailabilityDistributionSubsystem {
/// Start processing work as passed on from the Overseer. /// Start processing work as passed on from the Overseer.
async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> { async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
let Self { mut runtime, recvs, metrics } = self; let Self { mut runtime, recvs, metrics } = self;
let mut spans: HashMap<Hash, jaeger::PerLeafSpan> = HashMap::new();
let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs; let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs;
let mut requester = Requester::new(metrics.clone()).fuse(); let mut requester = Requester::new(metrics.clone()).fuse();
@@ -131,15 +134,24 @@ impl AvailabilityDistributionSubsystem {
}; };
match message { match message {
FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
let cloned_leaf = match update.activated.clone() {
Some(activated) => activated,
None => continue,
};
let span =
jaeger::PerLeafSpan::new(cloned_leaf.span, "availability-distribution");
spans.insert(cloned_leaf.hash, span);
log_error( log_error(
requester requester
.get_mut() .get_mut()
.update_fetching_heads(&mut ctx, &mut runtime, update) .update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await, .await,
"Error in Requester::update_fetching_heads", "Error in Requester::update_fetching_heads",
)?; )?;
}, },
FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _)) => {
spans.remove(&hash);
},
FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOrchestra::Communication { FromOrchestra::Communication {
msg: msg:
@@ -152,6 +164,15 @@ impl AvailabilityDistributionSubsystem {
tx, tx,
}, },
} => { } => {
let span = spans
.get(&relay_parent)
.map(|span| span.child("fetch-pov"))
.unwrap_or_else(|| jaeger::Span::new(&relay_parent, "fetch-pov"))
.with_trace_id(candidate_hash)
.with_candidate(candidate_hash)
.with_relay_parent(relay_parent)
.with_stage(jaeger::Stage::AvailabilityDistribution);
log_error( log_error(
pov_requester::fetch_pov( pov_requester::fetch_pov(
&mut ctx, &mut ctx,
@@ -163,6 +184,7 @@ impl AvailabilityDistributionSubsystem {
pov_hash, pov_hash,
tx, tx,
metrics.clone(), metrics.clone(),
&span,
) )
.await, .await,
"pov_requester::fetch_pov", "pov_requester::fetch_pov",
@@ -52,7 +52,18 @@ pub async fn fetch_pov<Context>(
pov_hash: Hash, pov_hash: Hash,
tx: oneshot::Sender<PoV>, tx: oneshot::Sender<PoV>,
metrics: Metrics, metrics: Metrics,
span: &jaeger::Span,
) -> Result<()> { ) -> Result<()> {
let _span = span
.child("fetch-pov")
.with_trace_id(candidate_hash)
.with_validator_index(from_validator)
.with_candidate(candidate_hash)
.with_para_id(para_id)
.with_relay_parent(parent)
.with_string_tag("pov-hash", format!("{:?}", pov_hash))
.with_stage(jaeger::Stage::AvailabilityDistribution);
let info = &runtime.get_session_info(ctx.sender(), parent).await?.session_info; let info = &runtime.get_session_info(ctx.sender(), parent).await?.session_info;
let authority_id = info let authority_id = info
.discovery_keys .discovery_keys
@@ -71,13 +82,9 @@ pub async fn fetch_pov<Context>(
)) ))
.await; .await;
let span = jaeger::Span::new(candidate_hash, "fetch-pov")
.with_validator_index(from_validator)
.with_relay_parent(parent)
.with_para_id(para_id);
ctx.spawn( ctx.spawn(
"pov-fetcher", "pov-fetcher",
fetch_pov_job(para_id, pov_hash, authority_id, pending_response.boxed(), span, tx, metrics) fetch_pov_job(para_id, pov_hash, authority_id, pending_response.boxed(), tx, metrics)
.boxed(), .boxed(),
) )
.map_err(|e| FatalError::SpawnTask(e))?; .map_err(|e| FatalError::SpawnTask(e))?;
@@ -90,11 +97,10 @@ async fn fetch_pov_job(
pov_hash: Hash, pov_hash: Hash,
authority_id: AuthorityDiscoveryId, authority_id: AuthorityDiscoveryId,
pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>, pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
span: jaeger::Span,
tx: oneshot::Sender<PoV>, tx: oneshot::Sender<PoV>,
metrics: Metrics, metrics: Metrics,
) { ) {
if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx, metrics).await { if let Err(err) = do_fetch_pov(pov_hash, pending_response, tx, metrics).await {
gum::warn!(target: LOG_TARGET, ?err, ?para_id, ?pov_hash, ?authority_id, "fetch_pov_job"); gum::warn!(target: LOG_TARGET, ?err, ?para_id, ?pov_hash, ?authority_id, "fetch_pov_job");
} }
} }
@@ -103,7 +109,6 @@ async fn fetch_pov_job(
async fn do_fetch_pov( async fn do_fetch_pov(
pov_hash: Hash, pov_hash: Hash,
pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>, pending_response: BoxFuture<'static, std::result::Result<PoVFetchingResponse, RequestError>>,
_span: jaeger::Span,
tx: oneshot::Sender<PoV>, tx: oneshot::Sender<PoV>,
metrics: Metrics, metrics: Metrics,
) -> Result<()> { ) -> Result<()> {
@@ -182,6 +187,7 @@ mod tests {
pov_hash, pov_hash,
tx, tx,
Metrics::new_dummy(), Metrics::new_dummy(),
&jaeger::Span::Disabled,
) )
.await .await
.expect("Should succeed"); .expect("Should succeed");
@@ -140,7 +140,18 @@ impl FetchTaskConfig {
sender: mpsc::Sender<FromFetchTask>, sender: mpsc::Sender<FromFetchTask>,
metrics: Metrics, metrics: Metrics,
session_info: &SessionInfo, session_info: &SessionInfo,
span: jaeger::Span,
) -> Self { ) -> Self {
let span = span
.child("fetch-task-config")
.with_trace_id(core.candidate_hash)
.with_string_tag("leaf", format!("{:?}", leaf))
.with_validator_index(session_info.our_index)
.with_uint_tag("group-index", core.group_responsible.0 as u64)
.with_relay_parent(core.candidate_descriptor.relay_parent)
.with_string_tag("pov-hash", format!("{:?}", core.candidate_descriptor.pov_hash))
.with_stage(jaeger::Stage::AvailabilityDistribution);
let live_in = vec![leaf].into_iter().collect(); let live_in = vec![leaf].into_iter().collect();
// Don't run tasks for our backing group: // Don't run tasks for our backing group:
@@ -148,9 +159,6 @@ impl FetchTaskConfig {
return FetchTaskConfig { live_in, prepared_running: None } return FetchTaskConfig { live_in, prepared_running: None }
} }
let span = jaeger::Span::new(core.candidate_hash, "availability-distribution")
.with_stage(jaeger::Stage::AvailabilityDistribution);
let prepared_running = RunningTask { let prepared_running = RunningTask {
session_index: session_info.session_index, session_index: session_info.session_index,
group_index: core.group_responsible, group_index: core.group_responsible,
@@ -251,20 +259,18 @@ impl RunningTask {
let mut bad_validators = Vec::new(); let mut bad_validators = Vec::new();
let mut succeeded = false; let mut succeeded = false;
let mut count: u32 = 0; let mut count: u32 = 0;
let mut _span = self let mut span = self.span.child("run-fetch-chunk-task").with_relay_parent(self.relay_parent);
.span
.child("fetch-task")
.with_chunk_index(self.request.index.0)
.with_relay_parent(self.relay_parent);
// Try validators in reverse order: // Try validators in reverse order:
while let Some(validator) = self.group.pop() { while let Some(validator) = self.group.pop() {
let _try_span = _span.child("try");
// Report retries: // Report retries:
if count > 0 { if count > 0 {
self.metrics.on_retry(); self.metrics.on_retry();
} }
count += 1; count += 1;
let _chunk_fetch_span = span
.child("fetch-chunk-request")
.with_chunk_index(self.request.index.0)
.with_stage(jaeger::Stage::AvailabilityDistribution);
// Send request: // Send request:
let resp = match self.do_request(&validator).await { let resp = match self.do_request(&validator).await {
Ok(resp) => resp, Ok(resp) => resp,
@@ -281,6 +287,12 @@ impl RunningTask {
continue continue
}, },
}; };
// We drop the span here, so that the span is not active while we recombine the chunk.
drop(_chunk_fetch_span);
let _chunk_recombine_span = span
.child("recombine-chunk")
.with_chunk_index(self.request.index.0)
.with_stage(jaeger::Stage::AvailabilityDistribution);
let chunk = match resp { let chunk = match resp {
ChunkFetchingResponse::Chunk(resp) => resp.recombine_into_chunk(&self.request), ChunkFetchingResponse::Chunk(resp) => resp.recombine_into_chunk(&self.request),
ChunkFetchingResponse::NoSuchChunk => { ChunkFetchingResponse::NoSuchChunk => {
@@ -298,6 +310,12 @@ impl RunningTask {
continue continue
}, },
}; };
// We drop the span so that the span is not active whilst we validate and store the chunk.
drop(_chunk_recombine_span);
let _chunk_validate_and_store_span = span
.child("validate-and-store-chunk")
.with_chunk_index(self.request.index.0)
.with_stage(jaeger::Stage::AvailabilityDistribution);
// Data genuine? // Data genuine?
if !self.validate_chunk(&validator, &chunk) { if !self.validate_chunk(&validator, &chunk) {
@@ -308,10 +326,9 @@ impl RunningTask {
// Ok, let's store it and be happy: // Ok, let's store it and be happy:
self.store_chunk(chunk).await; self.store_chunk(chunk).await;
succeeded = true; succeeded = true;
_span.add_string_tag("success", "true");
break break
} }
_span.add_int_tag("tries", count as _); span.add_int_tag("tries", count as _);
if succeeded { if succeeded {
self.metrics.on_fetch(SUCCEEDED); self.metrics.on_fetch(SUCCEEDED);
self.conclude(bad_validators).await; self.conclude(bad_validators).await;
@@ -33,6 +33,7 @@ use futures::{
}; };
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
jaeger,
messages::{ChainApiMessage, RuntimeApiMessage}, messages::{ChainApiMessage, RuntimeApiMessage},
overseer, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, overseer, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus,
}; };
@@ -100,14 +101,22 @@ impl Requester {
ctx: &mut Context, ctx: &mut Context,
runtime: &mut RuntimeInfo, runtime: &mut RuntimeInfo,
update: ActiveLeavesUpdate, update: ActiveLeavesUpdate,
spans: &HashMap<Hash, jaeger::PerLeafSpan>,
) -> Result<()> { ) -> Result<()> {
gum::trace!(target: LOG_TARGET, ?update, "Update fetching heads"); gum::trace!(target: LOG_TARGET, ?update, "Update fetching heads");
let ActiveLeavesUpdate { activated, deactivated } = update; let ActiveLeavesUpdate { activated, deactivated } = update;
// Stale leaves happen after a reversion - we don't want to re-run availability there. // Stale leaves happen after a reversion - we don't want to re-run availability there.
if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) { if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) {
let span = spans
.get(&leaf.hash)
.map(|span| span.child("update-fetching-heads"))
.unwrap_or_else(|| jaeger::Span::new(&leaf.hash, "update-fetching-heads"))
.with_string_tag("leaf", format!("{:?}", leaf.hash))
.with_stage(jaeger::Stage::AvailabilityDistribution);
// Order important! We need to handle activated, prior to deactivated, otherwise we might // Order important! We need to handle activated, prior to deactivated, otherwise we might
// cancel still needed jobs. // cancel still needed jobs.
self.start_requesting_chunks(ctx, runtime, leaf).await?; self.start_requesting_chunks(ctx, runtime, leaf, &span).await?;
} }
self.stop_requesting_chunks(deactivated.into_iter()); self.stop_requesting_chunks(deactivated.into_iter());
@@ -123,7 +132,13 @@ impl Requester {
ctx: &mut Context, ctx: &mut Context,
runtime: &mut RuntimeInfo, runtime: &mut RuntimeInfo,
new_head: ActivatedLeaf, new_head: ActivatedLeaf,
span: &jaeger::Span,
) -> Result<()> { ) -> Result<()> {
let mut span = span
.child("request-chunks-new-head")
.with_string_tag("leaf", format!("{:?}", new_head.hash))
.with_stage(jaeger::Stage::AvailabilityDistribution);
let sender = &mut ctx.sender().clone(); let sender = &mut ctx.sender().clone();
let ActivatedLeaf { hash: leaf, .. } = new_head; let ActivatedLeaf { hash: leaf, .. } = new_head;
let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session( let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session(
@@ -133,8 +148,15 @@ impl Requester {
Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION, Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION,
) )
.await?; .await?;
span.add_uint_tag("ancestors-in-session", ancestors_in_session.len() as u64);
// Also spawn or bump tasks for candidates in ancestry in the same session. // Also spawn or bump tasks for candidates in ancestry in the same session.
for hash in std::iter::once(leaf).chain(ancestors_in_session) { for hash in std::iter::once(leaf).chain(ancestors_in_session) {
let span = span
.child("request-chunks-ancestor")
.with_string_tag("leaf", format!("{:?}", hash.clone()))
.with_stage(jaeger::Stage::AvailabilityDistribution);
let cores = get_occupied_cores(sender, hash).await?; let cores = get_occupied_cores(sender, hash).await?;
gum::trace!( gum::trace!(
target: LOG_TARGET, target: LOG_TARGET,
@@ -148,7 +170,7 @@ impl Requester {
// The next time the subsystem receives leaf update, some of spawned task will be bumped // The next time the subsystem receives leaf update, some of spawned task will be bumped
// to be live in fresh relay parent, while some might get dropped due to the current leaf // to be live in fresh relay parent, while some might get dropped due to the current leaf
// being deactivated. // being deactivated.
self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?; self.add_cores(ctx, runtime, leaf, leaf_session_index, cores, span).await?;
} }
Ok(()) Ok(())
@@ -178,15 +200,24 @@ impl Requester {
leaf: Hash, leaf: Hash,
leaf_session_index: SessionIndex, leaf_session_index: SessionIndex,
cores: impl IntoIterator<Item = OccupiedCore>, cores: impl IntoIterator<Item = OccupiedCore>,
span: jaeger::Span,
) -> Result<()> { ) -> Result<()> {
for core in cores { for core in cores {
let mut span = span
.child("check-fetch-candidate")
.with_trace_id(core.candidate_hash)
.with_string_tag("leaf", format!("{:?}", leaf))
.with_candidate(core.candidate_hash)
.with_stage(jaeger::Stage::AvailabilityDistribution);
match self.fetches.entry(core.candidate_hash) { match self.fetches.entry(core.candidate_hash) {
Entry::Occupied(mut e) => Entry::Occupied(mut e) =>
// Just book keeping - we are already requesting that chunk: // Just book keeping - we are already requesting that chunk:
{ {
span.add_string_tag("already-requested-chunk", "true");
e.get_mut().add_leaf(leaf); e.get_mut().add_leaf(leaf);
}, },
Entry::Vacant(e) => { Entry::Vacant(e) => {
span.add_string_tag("already-requested-chunk", "false");
let tx = self.tx.clone(); let tx = self.tx.clone();
let metrics = self.metrics.clone(); let metrics = self.metrics.clone();
@@ -201,7 +232,7 @@ impl Requester {
// be fetchable by the state trie. // be fetchable by the state trie.
leaf, leaf,
leaf_session_index, leaf_session_index,
|info| FetchTaskConfig::new(leaf, &core, tx, metrics, info), |info| FetchTaskConfig::new(leaf, &core, tx, metrics, info, span),
) )
.await .await
.map_err(|err| { .map_err(|err| {
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::{future::Future, sync::Arc}; use std::{future::Future, sync::Arc};
use futures::FutureExt; use futures::FutureExt;
@@ -196,7 +198,7 @@ fn check_ancestry_lookup_in_same_session() {
test_harness(test_state.clone(), |mut ctx| async move { test_harness(test_state.clone(), |mut ctx| async move {
let chain = &test_state.relay_chain; let chain = &test_state.relay_chain;
let spans: HashMap<Hash, jaeger::PerLeafSpan> = HashMap::new();
let block_number = 1; let block_number = 1;
let update = ActiveLeavesUpdate { let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf { activated: Some(ActivatedLeaf {
@@ -209,7 +211,7 @@ fn check_ancestry_lookup_in_same_session() {
}; };
requester requester
.update_fetching_heads(&mut ctx, &mut runtime, update) .update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await .await
.expect("Leaf processing failed"); .expect("Leaf processing failed");
let fetch_tasks = &requester.fetches; let fetch_tasks = &requester.fetches;
@@ -229,7 +231,7 @@ fn check_ancestry_lookup_in_same_session() {
}; };
requester requester
.update_fetching_heads(&mut ctx, &mut runtime, update) .update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await .await
.expect("Leaf processing failed"); .expect("Leaf processing failed");
let fetch_tasks = &requester.fetches; let fetch_tasks = &requester.fetches;
@@ -255,7 +257,7 @@ fn check_ancestry_lookup_in_same_session() {
deactivated: vec![chain[1], chain[2]].into(), deactivated: vec![chain[1], chain[2]].into(),
}; };
requester requester
.update_fetching_heads(&mut ctx, &mut runtime, update) .update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await .await
.expect("Leaf processing failed"); .expect("Leaf processing failed");
let fetch_tasks = &requester.fetches; let fetch_tasks = &requester.fetches;
@@ -283,7 +285,7 @@ fn check_ancestry_lookup_in_different_sessions() {
test_harness(test_state.clone(), |mut ctx| async move { test_harness(test_state.clone(), |mut ctx| async move {
let chain = &test_state.relay_chain; let chain = &test_state.relay_chain;
let spans: HashMap<Hash, jaeger::PerLeafSpan> = HashMap::new();
let block_number = 3; let block_number = 3;
let update = ActiveLeavesUpdate { let update = ActiveLeavesUpdate {
activated: Some(ActivatedLeaf { activated: Some(ActivatedLeaf {
@@ -296,7 +298,7 @@ fn check_ancestry_lookup_in_different_sessions() {
}; };
requester requester
.update_fetching_heads(&mut ctx, &mut runtime, update) .update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await .await
.expect("Leaf processing failed"); .expect("Leaf processing failed");
let fetch_tasks = &requester.fetches; let fetch_tasks = &requester.fetches;
@@ -314,7 +316,7 @@ fn check_ancestry_lookup_in_different_sessions() {
}; };
requester requester
.update_fetching_heads(&mut ctx, &mut runtime, update) .update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await .await
.expect("Leaf processing failed"); .expect("Leaf processing failed");
let fetch_tasks = &requester.fetches; let fetch_tasks = &requester.fetches;
@@ -332,7 +334,7 @@ fn check_ancestry_lookup_in_different_sessions() {
}; };
requester requester
.update_fetching_heads(&mut ctx, &mut runtime, update) .update_fetching_heads(&mut ctx, &mut runtime, update, &spans)
.await .await
.expect("Leaf processing failed"); .expect("Leaf processing failed");
let fetch_tasks = &requester.fetches; let fetch_tasks = &requester.fetches;
@@ -186,7 +186,10 @@ where
{ {
let span = jaeger::Span::new(req.payload.candidate_hash, "answer-chunk-request"); let span = jaeger::Span::new(req.payload.candidate_hash, "answer-chunk-request");
let _child_span = span.child("answer-chunk-request").with_chunk_index(req.payload.index.0); let _child_span = span
.child("answer-chunk-request")
.with_trace_id(req.payload.candidate_hash)
.with_chunk_index(req.payload.index.0);
let chunk = query_chunk(sender, req.payload.candidate_hash, req.payload.index).await?; let chunk = query_chunk(sender, req.payload.candidate_hash, req.payload.index).await?;