mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 18:07:58 +00:00
add better spans for unbacked candidates (#2132)
* add better spans for unbacked candidates * improve span names
This commit is contained in:
committed by
GitHub
parent
d0c97539e4
commit
7cbf4f0b79
@@ -37,7 +37,7 @@ use polkadot_node_primitives::{
|
||||
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
|
||||
};
|
||||
use polkadot_subsystem::{
|
||||
jaeger,
|
||||
jaeger::{self, JaegerSpan},
|
||||
messages::{
|
||||
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
|
||||
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
|
||||
@@ -134,6 +134,8 @@ struct CandidateBackingJob {
|
||||
assignment: Option<ParaId>,
|
||||
/// The collator required to author the candidate, if any.
|
||||
required_collator: Option<CollatorId>,
|
||||
/// Spans for all candidates that are not yet backable.
|
||||
unbacked_candidates: HashMap<CandidateHash, JaegerSpan>,
|
||||
/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
|
||||
issued_statements: HashSet<CandidateHash>,
|
||||
/// These candidates are undergoing validation in the background.
|
||||
@@ -371,6 +373,7 @@ struct BackgroundValidationParams<F> {
|
||||
pov: Option<Arc<PoV>>,
|
||||
validator_index: Option<ValidatorIndex>,
|
||||
n_validators: usize,
|
||||
span: JaegerSpan,
|
||||
make_command: F,
|
||||
}
|
||||
|
||||
@@ -385,19 +388,26 @@ async fn validate_and_make_available(
|
||||
pov,
|
||||
validator_index,
|
||||
n_validators,
|
||||
span,
|
||||
make_command,
|
||||
} = params;
|
||||
|
||||
let pov = match pov {
|
||||
Some(pov) => pov,
|
||||
None => request_pov_from_distribution(
|
||||
&mut tx_from,
|
||||
relay_parent,
|
||||
candidate.descriptor.clone(),
|
||||
).await?,
|
||||
None => {
|
||||
let _span = span.child("request-pov");
|
||||
request_pov_from_distribution(
|
||||
&mut tx_from,
|
||||
relay_parent,
|
||||
candidate.descriptor.clone(),
|
||||
).await?
|
||||
}
|
||||
};
|
||||
|
||||
let v = request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?;
|
||||
let v = {
|
||||
let _span = span.child("request-validation");
|
||||
request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
|
||||
};
|
||||
|
||||
let expected_commitments_hash = candidate.commitments_hash;
|
||||
|
||||
@@ -413,6 +423,7 @@ async fn validate_and_make_available(
|
||||
);
|
||||
Err(candidate)
|
||||
} else {
|
||||
let _span = span.child("make-available");
|
||||
let erasure_valid = make_pov_available(
|
||||
&mut tx_from,
|
||||
validator_index,
|
||||
@@ -458,12 +469,12 @@ impl CandidateBackingJob {
|
||||
async fn run_loop(
|
||||
mut self,
|
||||
mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
|
||||
span: &jaeger::JaegerSpan
|
||||
span: &JaegerSpan
|
||||
) -> Result<(), Error> {
|
||||
loop {
|
||||
futures::select! {
|
||||
validated_command = self.background_validation.next() => {
|
||||
let _span = span.child("background validation");
|
||||
let _span = span.child("process-validation-result");
|
||||
if let Some(c) = validated_command {
|
||||
self.handle_validated_candidate_command(c).await?;
|
||||
} else {
|
||||
@@ -473,8 +484,10 @@ impl CandidateBackingJob {
|
||||
to_job = rx_to.next() => match to_job {
|
||||
None => break,
|
||||
Some(msg) => {
|
||||
let _span = span.child("process message");
|
||||
self.process_msg(msg).await?;
|
||||
// we intentionally want spans created in `process_msg` to descend from the
|
||||
// `span ` which is longer-lived than this ephemeral timing span.
|
||||
let _timing_span = span.child("process-message");
|
||||
self.process_msg(&span, msg).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -490,6 +503,7 @@ impl CandidateBackingJob {
|
||||
) -> Result<(), Error> {
|
||||
let candidate_hash = command.candidate_hash();
|
||||
self.awaiting_validation.remove(&candidate_hash);
|
||||
self.remove_unbacked_span(&candidate_hash);
|
||||
|
||||
match command {
|
||||
ValidatedCandidateCommand::Second(res) => {
|
||||
@@ -564,9 +578,10 @@ impl CandidateBackingJob {
|
||||
}
|
||||
|
||||
/// Kick off background validation with intent to second.
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
|
||||
async fn validate_and_second(
|
||||
&mut self,
|
||||
parent_span: &JaegerSpan,
|
||||
candidate: &CandidateReceipt,
|
||||
pov: Arc<PoV>,
|
||||
) -> Result<(), Error> {
|
||||
@@ -578,6 +593,11 @@ impl CandidateBackingJob {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let candidate_hash = candidate.hash();
|
||||
self.add_unbacked_span(&parent_span, candidate_hash);
|
||||
let span = self.get_unbacked_validation_child(&candidate_hash)
|
||||
.expect("just added unbacked span; qed");
|
||||
|
||||
self.background_validate_and_make_available(BackgroundValidationParams {
|
||||
tx_from: self.tx_from.clone(),
|
||||
tx_command: self.background_validation_tx.clone(),
|
||||
@@ -586,6 +606,7 @@ impl CandidateBackingJob {
|
||||
pov: Some(pov),
|
||||
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
|
||||
n_validators: self.table_context.validators.len(),
|
||||
span,
|
||||
make_command: ValidatedCandidateCommand::Second,
|
||||
}).await?;
|
||||
|
||||
@@ -655,6 +676,8 @@ impl CandidateBackingJob {
|
||||
// `HashSet::insert` returns true if the thing wasn't in there already.
|
||||
// one of the few places the Rust-std folks did a bad job with API
|
||||
if self.backed.insert(summary.candidate) {
|
||||
self.remove_unbacked_span(&summary.candidate);
|
||||
|
||||
if let Some(backed) =
|
||||
table_attested_to_backed(attested, &self.table_context)
|
||||
{
|
||||
@@ -673,8 +696,8 @@ impl CandidateBackingJob {
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
|
||||
#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
|
||||
async fn process_msg(&mut self, span: &JaegerSpan, msg: CandidateBackingMessage) -> Result<(), Error> {
|
||||
match msg {
|
||||
CandidateBackingMessage::Second(_, candidate, pov) => {
|
||||
let _timer = self.metrics.time_process_second();
|
||||
@@ -693,7 +716,7 @@ impl CandidateBackingJob {
|
||||
let pov = Arc::new(pov);
|
||||
|
||||
if !self.issued_statements.contains(&candidate_hash) {
|
||||
self.validate_and_second(&candidate, pov.clone()).await?;
|
||||
self.validate_and_second(&span, &candidate, pov.clone()).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -701,7 +724,7 @@ impl CandidateBackingJob {
|
||||
let _timer = self.metrics.time_process_statement();
|
||||
|
||||
self.check_statement_signature(&statement)?;
|
||||
match self.maybe_validate_and_import(statement).await {
|
||||
match self.maybe_validate_and_import(&span, statement).await {
|
||||
Err(Error::ValidationFailed(_)) => return Ok(()),
|
||||
Err(e) => return Err(e),
|
||||
Ok(()) => (),
|
||||
@@ -726,10 +749,11 @@ impl CandidateBackingJob {
|
||||
}
|
||||
|
||||
/// Kick off validation work and distribute the result as a signed statement.
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
|
||||
async fn kick_off_validation_work(
|
||||
&mut self,
|
||||
summary: TableSummary,
|
||||
span: JaegerSpan,
|
||||
) -> Result<(), Error> {
|
||||
let candidate_hash = summary.candidate;
|
||||
|
||||
@@ -763,20 +787,26 @@ impl CandidateBackingJob {
|
||||
pov: None,
|
||||
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
|
||||
n_validators: self.table_context.validators.len(),
|
||||
span,
|
||||
make_command: ValidatedCandidateCommand::Attest,
|
||||
}).await
|
||||
}
|
||||
|
||||
/// Import the statement and kick off validation work if it is a part of our assignment.
|
||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||
#[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))]
|
||||
async fn maybe_validate_and_import(
|
||||
&mut self,
|
||||
parent_span: &JaegerSpan,
|
||||
statement: SignedFullStatement,
|
||||
) -> Result<(), Error> {
|
||||
if let Some(summary) = self.import_statement(&statement).await? {
|
||||
if let Statement::Seconded(_) = statement.payload() {
|
||||
self.add_unbacked_span(parent_span, summary.candidate);
|
||||
if Some(summary.group_id) == self.assignment {
|
||||
self.kick_off_validation_work(summary).await?;
|
||||
let span = self.get_unbacked_validation_child(&summary.candidate)
|
||||
.expect("just created unbacked span; qed");
|
||||
|
||||
self.kick_off_validation_work(summary, span).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -812,6 +842,22 @@ impl CandidateBackingJob {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_unbacked_span(&mut self, parent_span: &JaegerSpan, hash: CandidateHash) {
|
||||
self.unbacked_candidates.entry(hash).or_insert_with(|| {
|
||||
let mut span = parent_span.child("unbacked-candidate");
|
||||
span.add_string_tag("candidate-hash", &format!("{:?}", hash.0));
|
||||
span
|
||||
});
|
||||
}
|
||||
|
||||
fn get_unbacked_validation_child(&self, hash: &CandidateHash) -> Option<JaegerSpan> {
|
||||
self.unbacked_candidates.get(hash).map(|span| span.child("validation"))
|
||||
}
|
||||
|
||||
fn remove_unbacked_span(&mut self, hash: &CandidateHash) {
|
||||
self.unbacked_candidates.remove(hash);
|
||||
}
|
||||
|
||||
async fn send_to_provisioner(&mut self, msg: ProvisionerMessage) -> Result<(), Error> {
|
||||
self.tx_from.send(AllMessages::from(msg).into()).await?;
|
||||
|
||||
@@ -875,7 +921,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
}
|
||||
|
||||
let span = jaeger::hash_span(&parent, "run:backing");
|
||||
let _span = span.child("runtime apis");
|
||||
let _span = span.child("runtime-apis");
|
||||
|
||||
let (validators, groups, session_index, cores) = futures::try_join!(
|
||||
try_runtime_api!(request_validators(parent, &mut tx_from).await),
|
||||
@@ -894,7 +940,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
let cores = try_runtime_api!(cores);
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("validator construction");
|
||||
let _span = span.child("validator-construction");
|
||||
|
||||
let signing_context = SigningContext { parent_hash: parent, session_index };
|
||||
let validator = match Validator::construct(
|
||||
@@ -916,7 +962,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
};
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("calc validator groups");
|
||||
let _span = span.child("calc-validator-groups");
|
||||
|
||||
|
||||
let mut groups = HashMap::new();
|
||||
@@ -951,7 +997,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
};
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("wait for candidate backing job");
|
||||
let _span = span.child("wait-for-job");
|
||||
|
||||
let (background_tx, background_rx) = mpsc::channel(16);
|
||||
let job = CandidateBackingJob {
|
||||
@@ -962,6 +1008,7 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
issued_statements: HashSet::new(),
|
||||
awaiting_validation: HashSet::new(),
|
||||
seconded: None,
|
||||
unbacked_candidates: HashMap::new(),
|
||||
backed: HashSet::new(),
|
||||
reported_misbehavior_for: HashSet::new(),
|
||||
keystore,
|
||||
|
||||
Reference in New Issue
Block a user