mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 21:01:05 +00:00
initial jaeger integration (#2047)
Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com> Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
15c253117d
commit
a5fe710cc6
@@ -37,6 +37,7 @@ use polkadot_node_primitives::{
|
||||
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
|
||||
};
|
||||
use polkadot_subsystem::{
|
||||
jaeger,
|
||||
messages::{
|
||||
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
|
||||
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
|
||||
@@ -457,10 +458,12 @@ impl CandidateBackingJob {
|
||||
async fn run_loop(
|
||||
mut self,
|
||||
mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
|
||||
span: &jaeger::JaegerSpan
|
||||
) -> Result<(), Error> {
|
||||
loop {
|
||||
futures::select! {
|
||||
validated_command = self.background_validation.next() => {
|
||||
let _span = span.child("background validation");
|
||||
if let Some(c) = validated_command {
|
||||
self.handle_validated_candidate_command(c).await?;
|
||||
} else {
|
||||
@@ -470,6 +473,7 @@ impl CandidateBackingJob {
|
||||
to_job = rx_to.next() => match to_job {
|
||||
None => break,
|
||||
Some(msg) => {
|
||||
let _span = span.child("process message");
|
||||
self.process_msg(msg).await?;
|
||||
}
|
||||
}
|
||||
@@ -870,6 +874,9 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
}
|
||||
}
|
||||
|
||||
let span = jaeger::hash_span(&parent, "run:backing");
|
||||
let _span = span.child("runtime apis");
|
||||
|
||||
let (validators, groups, session_index, cores) = futures::try_join!(
|
||||
try_runtime_api!(request_validators(parent, &mut tx_from).await),
|
||||
try_runtime_api!(request_validator_groups(parent, &mut tx_from).await),
|
||||
@@ -886,6 +893,9 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
let session_index = try_runtime_api!(session_index);
|
||||
let cores = try_runtime_api!(cores);
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("validator construction");
|
||||
|
||||
let signing_context = SigningContext { parent_hash: parent, session_index };
|
||||
let validator = match Validator::construct(
|
||||
&validators,
|
||||
@@ -905,6 +915,10 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
}
|
||||
};
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("calc validator groups");
|
||||
|
||||
|
||||
let mut groups = HashMap::new();
|
||||
|
||||
let n_cores = cores.len();
|
||||
@@ -936,6 +950,9 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
Some((assignment, required_collator)) => (Some(assignment), required_collator),
|
||||
};
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("wait for candidate backing job");
|
||||
|
||||
let (background_tx, background_rx) = mpsc::channel(16);
|
||||
let job = CandidateBackingJob {
|
||||
parent,
|
||||
@@ -954,10 +971,10 @@ impl util::JobTrait for CandidateBackingJob {
|
||||
background_validation_tx: background_tx,
|
||||
metrics,
|
||||
};
|
||||
drop(_span);
|
||||
|
||||
job.run_loop(rx_to).await
|
||||
}
|
||||
.boxed()
|
||||
job.run_loop(rx_to, &span).await
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
|
||||
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
|
||||
use polkadot_node_subsystem::{
|
||||
jaeger,
|
||||
messages::{
|
||||
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
|
||||
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
|
||||
@@ -75,7 +76,9 @@ async fn get_core_availability(
|
||||
validator_idx: ValidatorIndex,
|
||||
sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>,
|
||||
) -> Result<bool, Error> {
|
||||
let span = jaeger::hash_span(&relay_parent, "core_availability");
|
||||
if let CoreState::Occupied(core) = core {
|
||||
let _span = span.child("occupied");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender
|
||||
.lock()
|
||||
@@ -97,6 +100,10 @@ async fn get_core_availability(
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("query chunk");
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender
|
||||
.lock()
|
||||
@@ -120,6 +127,7 @@ async fn get_availability_cores(
|
||||
relay_parent: Hash,
|
||||
sender: &mut mpsc::Sender<FromJobCommand>,
|
||||
) -> Result<Vec<CoreState>, Error> {
|
||||
let _span = jaeger::hash_span(&relay_parent, "get availability cores");
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender
|
||||
.send(AllMessages::from(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx))).into())
|
||||
@@ -226,6 +234,8 @@ impl JobTrait for BitfieldSigningJob {
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
let metrics = metrics.clone();
|
||||
async move {
|
||||
let span = jaeger::hash_span(&relay_parent, "run:bitfield-signing");
|
||||
let _span = span.child("delay");
|
||||
let wait_until = Instant::now() + JOB_DELAY;
|
||||
|
||||
// now do all the work we can before we need to wait for the availability store
|
||||
@@ -243,6 +253,9 @@ impl JobTrait for BitfieldSigningJob {
|
||||
// JOB_DELAY each time.
|
||||
let _timer = metrics.time_run();
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("availablity");
|
||||
|
||||
let bitfield =
|
||||
match construct_availability_bitfield(relay_parent, validator.index(), &mut sender).await
|
||||
{
|
||||
@@ -255,12 +268,18 @@ impl JobTrait for BitfieldSigningJob {
|
||||
Ok(bitfield) => bitfield,
|
||||
};
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("signing");
|
||||
|
||||
let signed_bitfield = validator
|
||||
.sign(keystore.clone(), bitfield)
|
||||
.await
|
||||
.map_err(|e| Error::Keystore(e))?;
|
||||
metrics.on_bitfield_signed();
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("gossip");
|
||||
|
||||
sender
|
||||
.send(
|
||||
AllMessages::from(
|
||||
|
||||
@@ -25,6 +25,7 @@ use futures::{
|
||||
};
|
||||
use sp_keystore::SyncCryptoStorePtr;
|
||||
use polkadot_node_subsystem::{
|
||||
jaeger,
|
||||
errors::ChainApiError,
|
||||
messages::{
|
||||
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
|
||||
@@ -99,7 +100,9 @@ impl JobTrait for CandidateSelectionJob {
|
||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||
mut sender: mpsc::Sender<FromJobCommand>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||
let span = jaeger::hash_span(&relay_parent, "candidate-selection:run");
|
||||
async move {
|
||||
let _span = span.child("query runtime");
|
||||
let (groups, cores) = futures::try_join!(
|
||||
try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await),
|
||||
try_runtime_api!(request_from_runtime(
|
||||
@@ -112,6 +115,9 @@ impl JobTrait for CandidateSelectionJob {
|
||||
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
|
||||
let cores = try_runtime_api!(cores);
|
||||
|
||||
drop(_span);
|
||||
let _span = span.child("find assignment");
|
||||
|
||||
let n_cores = cores.len();
|
||||
|
||||
let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await {
|
||||
@@ -141,7 +147,9 @@ impl JobTrait for CandidateSelectionJob {
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop().await
|
||||
drop(_span);
|
||||
|
||||
CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop(&span).await
|
||||
}.boxed()
|
||||
}
|
||||
}
|
||||
@@ -162,7 +170,8 @@ impl CandidateSelectionJob {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_loop(&mut self) -> Result<(), Error> {
|
||||
async fn run_loop(&mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> {
|
||||
let span = span.child("run loop");
|
||||
loop {
|
||||
match self.receiver.next().await {
|
||||
Some(CandidateSelectionMessage::Collation(
|
||||
@@ -170,12 +179,14 @@ impl CandidateSelectionJob {
|
||||
para_id,
|
||||
collator_id,
|
||||
)) => {
|
||||
let _span = span.child("handle collation");
|
||||
self.handle_collation(relay_parent, para_id, collator_id).await;
|
||||
}
|
||||
Some(CandidateSelectionMessage::Invalid(
|
||||
_,
|
||||
candidate_receipt,
|
||||
)) => {
|
||||
let _span = span.child("handle invalid");
|
||||
self.handle_invalid(candidate_receipt).await;
|
||||
}
|
||||
None => break,
|
||||
@@ -459,10 +470,10 @@ mod tests {
|
||||
};
|
||||
|
||||
preconditions(&mut job);
|
||||
|
||||
let span = jaeger::JaegerSpan::Disabled;
|
||||
let (_, job_result) = futures::executor::block_on(future::join(
|
||||
test(to_job_tx, from_job_rx),
|
||||
job.run_loop(),
|
||||
job.run_loop(&span),
|
||||
));
|
||||
|
||||
postconditions(job, job_result);
|
||||
|
||||
@@ -20,7 +20,10 @@
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::select;
|
||||
use polkadot_node_subsystem::{messages::{AllMessages, ProvisionerInherentData, ProvisionerMessage}, SubsystemError};
|
||||
use polkadot_node_subsystem::{
|
||||
jaeger,
|
||||
messages::{AllMessages, ProvisionerInherentData, ProvisionerMessage}, SubsystemError,
|
||||
};
|
||||
use polkadot_overseer::OverseerHandler;
|
||||
use polkadot_primitives::v1::{
|
||||
Block, Hash, Header,
|
||||
@@ -193,6 +196,9 @@ where
|
||||
record_proof: RecordProof,
|
||||
) -> Self::Proposal {
|
||||
async move {
|
||||
let span = jaeger::hash_span(&self.parent_header_hash, "propose");
|
||||
let _span = span.child("get provisioner");
|
||||
|
||||
let provisioner_data = match self.get_provisioner_data().await {
|
||||
Ok(pd) => pd,
|
||||
Err(err) => {
|
||||
@@ -201,11 +207,14 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
drop(_span);
|
||||
|
||||
inherent_data.put_data(
|
||||
polkadot_primitives::v1::INCLUSION_INHERENT_IDENTIFIER,
|
||||
&provisioner_data,
|
||||
)?;
|
||||
|
||||
let _span = span.child("authorship propose");
|
||||
self.inner
|
||||
.propose(inherent_data, inherent_digests, max_duration, record_proof)
|
||||
.await
|
||||
|
||||
@@ -26,6 +26,7 @@ use futures::{
|
||||
};
|
||||
use polkadot_node_subsystem::{
|
||||
errors::{ChainApiError, RuntimeApiError},
|
||||
jaeger,
|
||||
messages::{
|
||||
AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
|
||||
ProvisionerMessage,
|
||||
@@ -154,10 +155,12 @@ impl JobTrait for ProvisioningJob {
|
||||
sender,
|
||||
receiver,
|
||||
);
|
||||
|
||||
let span = jaeger::hash_span(&relay_parent, "provisioner");
|
||||
|
||||
// it isn't necessary to break run_loop into its own function,
|
||||
// but it's convenient to separate the concerns in this way
|
||||
job.run_loop().await
|
||||
job.run_loop(&span).await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
@@ -183,15 +186,15 @@ impl ProvisioningJob {
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_loop(mut self) -> Result<(), Error> {
|
||||
async fn run_loop(mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> {
|
||||
use ProvisionerMessage::{
|
||||
ProvisionableData, RequestBlockAuthorshipData, RequestInherentData,
|
||||
};
|
||||
|
||||
loop {
|
||||
futures::select! {
|
||||
msg = self.receiver.next().fuse() => match msg {
|
||||
Some(RequestInherentData(_, return_sender)) => {
|
||||
let _span = span.child("req inherent data");
|
||||
let _timer = self.metrics.time_request_inherent_data();
|
||||
|
||||
if self.inherent_after.is_ready() {
|
||||
@@ -201,9 +204,11 @@ impl ProvisioningJob {
|
||||
}
|
||||
}
|
||||
Some(RequestBlockAuthorshipData(_, sender)) => {
|
||||
let _span = span.child("req block authorship");
|
||||
self.provisionable_data_channels.push(sender)
|
||||
}
|
||||
Some(ProvisionableData(_, data)) => {
|
||||
let _span = span.child("provisionable data");
|
||||
let _timer = self.metrics.time_provisionable_data();
|
||||
|
||||
let mut bad_indices = Vec::new();
|
||||
@@ -241,6 +246,7 @@ impl ProvisioningJob {
|
||||
None => break,
|
||||
},
|
||||
_ = self.inherent_after.ready().fuse() => {
|
||||
let _span = span.child("send inherent data");
|
||||
let return_senders = std::mem::take(&mut self.awaiting_inherent);
|
||||
if !return_senders.is_empty() {
|
||||
self.send_inherent_data(return_senders).await;
|
||||
|
||||
Reference in New Issue
Block a user