diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index c54eef157b..3de6c37e61 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -37,7 +37,7 @@ use polkadot_node_primitives::{ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult, }; use polkadot_subsystem::{ - jaeger, + jaeger::{self, JaegerSpan}, messages::{ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, PoVDistributionMessage, ProvisionableData, @@ -134,6 +134,8 @@ struct CandidateBackingJob { assignment: Option, /// The collator required to author the candidate, if any. required_collator: Option, + /// Spans for all candidates that are not yet backable. + unbacked_candidates: HashMap, /// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates. issued_statements: HashSet, /// These candidates are undergoing validation in the background. @@ -371,6 +373,7 @@ struct BackgroundValidationParams { pov: Option>, validator_index: Option, n_validators: usize, + span: JaegerSpan, make_command: F, } @@ -385,19 +388,26 @@ async fn validate_and_make_available( pov, validator_index, n_validators, + span, make_command, } = params; let pov = match pov { Some(pov) => pov, - None => request_pov_from_distribution( - &mut tx_from, - relay_parent, - candidate.descriptor.clone(), - ).await?, + None => { + let _span = span.child("request-pov"); + request_pov_from_distribution( + &mut tx_from, + relay_parent, + candidate.descriptor.clone(), + ).await? + } }; - let v = request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?; + let v = { + let _span = span.child("request-validation"); + request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await? + }; let expected_commitments_hash = candidate.commitments_hash; @@ -413,6 +423,7 @@ async fn validate_and_make_available( ); Err(candidate) } else { + let _span = span.child("make-available"); let erasure_valid = make_pov_available( &mut tx_from, validator_index, @@ -458,12 +469,12 @@ impl CandidateBackingJob { async fn run_loop( mut self, mut rx_to: mpsc::Receiver, - span: &jaeger::JaegerSpan + span: &JaegerSpan ) -> Result<(), Error> { loop { futures::select! { validated_command = self.background_validation.next() => { - let _span = span.child("background validation"); + let _span = span.child("process-validation-result"); if let Some(c) = validated_command { self.handle_validated_candidate_command(c).await?; } else { @@ -473,8 +484,10 @@ impl CandidateBackingJob { to_job = rx_to.next() => match to_job { None => break, Some(msg) => { - let _span = span.child("process message"); - self.process_msg(msg).await?; + // we intentionally want spans created in `process_msg` to descend from the + // `span ` which is longer-lived than this ephemeral timing span. + let _timing_span = span.child("process-message"); + self.process_msg(&span, msg).await?; } } } @@ -490,6 +503,7 @@ impl CandidateBackingJob { ) -> Result<(), Error> { let candidate_hash = command.candidate_hash(); self.awaiting_validation.remove(&candidate_hash); + self.remove_unbacked_span(&candidate_hash); match command { ValidatedCandidateCommand::Second(res) => { @@ -564,9 +578,10 @@ impl CandidateBackingJob { } /// Kick off background validation with intent to second. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))] async fn validate_and_second( &mut self, + parent_span: &JaegerSpan, candidate: &CandidateReceipt, pov: Arc, ) -> Result<(), Error> { @@ -578,6 +593,11 @@ impl CandidateBackingJob { return Ok(()); } + let candidate_hash = candidate.hash(); + self.add_unbacked_span(&parent_span, candidate_hash); + let span = self.get_unbacked_validation_child(&candidate_hash) + .expect("just added unbacked span; qed"); + self.background_validate_and_make_available(BackgroundValidationParams { tx_from: self.tx_from.clone(), tx_command: self.background_validation_tx.clone(), @@ -586,6 +606,7 @@ impl CandidateBackingJob { pov: Some(pov), validator_index: self.table_context.validator.as_ref().map(|v| v.index()), n_validators: self.table_context.validators.len(), + span, make_command: ValidatedCandidateCommand::Second, }).await?; @@ -655,6 +676,8 @@ impl CandidateBackingJob { // `HashSet::insert` returns true if the thing wasn't in there already. // one of the few places the Rust-std folks did a bad job with API if self.backed.insert(summary.candidate) { + self.remove_unbacked_span(&summary.candidate); + if let Some(backed) = table_attested_to_backed(attested, &self.table_context) { @@ -673,8 +696,8 @@ impl CandidateBackingJob { Ok(summary) } - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> { + #[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))] + async fn process_msg(&mut self, span: &JaegerSpan, msg: CandidateBackingMessage) -> Result<(), Error> { match msg { CandidateBackingMessage::Second(_, candidate, pov) => { let _timer = self.metrics.time_process_second(); @@ -693,7 +716,7 @@ impl CandidateBackingJob { let pov = Arc::new(pov); if !self.issued_statements.contains(&candidate_hash) { - self.validate_and_second(&candidate, pov.clone()).await?; + self.validate_and_second(&span, &candidate, pov.clone()).await?; } } } @@ -701,7 +724,7 @@ impl CandidateBackingJob { let _timer = self.metrics.time_process_statement(); self.check_statement_signature(&statement)?; - match self.maybe_validate_and_import(statement).await { + match self.maybe_validate_and_import(&span, statement).await { Err(Error::ValidationFailed(_)) => return Ok(()), Err(e) => return Err(e), Ok(()) => (), @@ -726,10 +749,11 @@ impl CandidateBackingJob { } /// Kick off validation work and distribute the result as a signed statement. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))] async fn kick_off_validation_work( &mut self, summary: TableSummary, + span: JaegerSpan, ) -> Result<(), Error> { let candidate_hash = summary.candidate; @@ -763,20 +787,26 @@ impl CandidateBackingJob { pov: None, validator_index: self.table_context.validator.as_ref().map(|v| v.index()), n_validators: self.table_context.validators.len(), + span, make_command: ValidatedCandidateCommand::Attest, }).await } /// Import the statement and kick off validation work if it is a part of our assignment. - #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))] async fn maybe_validate_and_import( &mut self, + parent_span: &JaegerSpan, statement: SignedFullStatement, ) -> Result<(), Error> { if let Some(summary) = self.import_statement(&statement).await? { if let Statement::Seconded(_) = statement.payload() { + self.add_unbacked_span(parent_span, summary.candidate); if Some(summary.group_id) == self.assignment { - self.kick_off_validation_work(summary).await?; + let span = self.get_unbacked_validation_child(&summary.candidate) + .expect("just created unbacked span; qed"); + + self.kick_off_validation_work(summary, span).await?; } } } @@ -812,6 +842,22 @@ impl CandidateBackingJob { Ok(()) } + fn add_unbacked_span(&mut self, parent_span: &JaegerSpan, hash: CandidateHash) { + self.unbacked_candidates.entry(hash).or_insert_with(|| { + let mut span = parent_span.child("unbacked-candidate"); + span.add_string_tag("candidate-hash", &format!("{:?}", hash.0)); + span + }); + } + + fn get_unbacked_validation_child(&self, hash: &CandidateHash) -> Option { + self.unbacked_candidates.get(hash).map(|span| span.child("validation")) + } + + fn remove_unbacked_span(&mut self, hash: &CandidateHash) { + self.unbacked_candidates.remove(hash); + } + async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> { self.tx_from.send(AllMessages::from(msg).into()).await?; @@ -875,7 +921,7 @@ impl util::JobTrait for CandidateBackingJob { } let span = jaeger::hash_span(&parent, "run:backing"); - let _span = span.child("runtime apis"); + let _span = span.child("runtime-apis"); let (validators, groups, session_index, cores) = futures::try_join!( try_runtime_api!(request_validators(parent, &mut tx_from).await), @@ -894,7 +940,7 @@ impl util::JobTrait for CandidateBackingJob { let cores = try_runtime_api!(cores); drop(_span); - let _span = span.child("validator construction"); + let _span = span.child("validator-construction"); let signing_context = SigningContext { parent_hash: parent, session_index }; let validator = match Validator::construct( @@ -916,7 +962,7 @@ impl util::JobTrait for CandidateBackingJob { }; drop(_span); - let _span = span.child("calc validator groups"); + let _span = span.child("calc-validator-groups"); let mut groups = HashMap::new(); @@ -951,7 +997,7 @@ impl util::JobTrait for CandidateBackingJob { }; drop(_span); - let _span = span.child("wait for candidate backing job"); + let _span = span.child("wait-for-job"); let (background_tx, background_rx) = mpsc::channel(16); let job = CandidateBackingJob { @@ -962,6 +1008,7 @@ impl util::JobTrait for CandidateBackingJob { issued_statements: HashSet::new(), awaiting_validation: HashSet::new(), seconded: None, + unbacked_candidates: HashMap::new(), backed: HashSet::new(), reported_misbehavior_for: HashSet::new(), keystore,