mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 17:31:03 +00:00
feat/jaeger: more spans, more stages (#2477)
* feat/jaeger: more spans, more stages Stage numbers are still arbitrarily picked. * feat/jaeger: additional spans * chore/spellcheck: improve the dictionary * fix/jaeger JaegerSpan -> jaeger::Span
This commit is contained in:
committed by
GitHub
parent
a5defa7c7f
commit
49c6aa9a76
@@ -37,6 +37,7 @@ instantiate/B
|
|||||||
intrinsic/MS
|
intrinsic/MS
|
||||||
intrinsics
|
intrinsics
|
||||||
io
|
io
|
||||||
|
jaeger/MS
|
||||||
js
|
js
|
||||||
keccak256/M
|
keccak256/M
|
||||||
KSM/S
|
KSM/S
|
||||||
@@ -68,6 +69,7 @@ struct/MS
|
|||||||
subsystem/MS
|
subsystem/MS
|
||||||
subsystems'
|
subsystems'
|
||||||
taskmanager/MS
|
taskmanager/MS
|
||||||
|
TCP
|
||||||
teleport/RG
|
teleport/RG
|
||||||
teleportation/SM
|
teleportation/SM
|
||||||
teleporter/SM
|
teleporter/SM
|
||||||
@@ -76,6 +78,8 @@ testnet/MS
|
|||||||
trie/MS
|
trie/MS
|
||||||
trustless/Y
|
trustless/Y
|
||||||
ubuntu/M
|
ubuntu/M
|
||||||
|
UDP
|
||||||
|
UI
|
||||||
union/MSG
|
union/MSG
|
||||||
unservable/B
|
unservable/B
|
||||||
validator/SM
|
validator/SM
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ use polkadot_primitives::v1::{
|
|||||||
};
|
};
|
||||||
use polkadot_node_subsystem_util::TimeoutExt;
|
use polkadot_node_subsystem_util::TimeoutExt;
|
||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan, messages::AllMessages,
|
ActiveLeavesUpdate, errors::RuntimeApiError, jaeger, messages::AllMessages,
|
||||||
};
|
};
|
||||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||||
use sp_keyring::Sr25519Keyring;
|
use sp_keyring::Sr25519Keyring;
|
||||||
@@ -240,7 +240,7 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
|
activated: vec![(new_leaf, Arc::new(jaeger::Span::Disabled))].into(),
|
||||||
deactivated: vec![].into(),
|
deactivated: vec![].into(),
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -885,7 +885,7 @@ async fn import_leaf(
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
virtual_overseer,
|
virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
|
activated: vec![(new_leaf, Arc::new(jaeger::Span::Disabled))].into(),
|
||||||
deactivated: vec![].into(),
|
deactivated: vec![].into(),
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
|
|||||||
@@ -35,7 +35,8 @@ use polkadot_node_primitives::{
|
|||||||
Statement, SignedFullStatement, ValidationResult,
|
Statement, SignedFullStatement, ValidationResult,
|
||||||
};
|
};
|
||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
JaegerSpan, PerLeafSpan,
|
PerLeafSpan, Stage,
|
||||||
|
jaeger,
|
||||||
messages::{
|
messages::{
|
||||||
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
|
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
|
||||||
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
|
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
|
||||||
@@ -134,7 +135,7 @@ struct CandidateBackingJob {
|
|||||||
/// The collator required to author the candidate, if any.
|
/// The collator required to author the candidate, if any.
|
||||||
required_collator: Option<CollatorId>,
|
required_collator: Option<CollatorId>,
|
||||||
/// Spans for all candidates that are not yet backable.
|
/// Spans for all candidates that are not yet backable.
|
||||||
unbacked_candidates: HashMap<CandidateHash, JaegerSpan>,
|
unbacked_candidates: HashMap<CandidateHash, jaeger::Span>,
|
||||||
/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
|
/// We issued `Seconded`, `Valid` or `Invalid` statements on about these candidates.
|
||||||
issued_statements: HashSet<CandidateHash>,
|
issued_statements: HashSet<CandidateHash>,
|
||||||
/// These candidates are undergoing validation in the background.
|
/// These candidates are undergoing validation in the background.
|
||||||
@@ -294,7 +295,7 @@ async fn make_pov_available(
|
|||||||
candidate_hash: CandidateHash,
|
candidate_hash: CandidateHash,
|
||||||
validation_data: polkadot_primitives::v1::PersistedValidationData,
|
validation_data: polkadot_primitives::v1::PersistedValidationData,
|
||||||
expected_erasure_root: Hash,
|
expected_erasure_root: Hash,
|
||||||
span: Option<&JaegerSpan>,
|
span: Option<&jaeger::Span>,
|
||||||
) -> Result<Result<(), InvalidErasureRoot>, Error> {
|
) -> Result<Result<(), InvalidErasureRoot>, Error> {
|
||||||
let available_data = AvailableData {
|
let available_data = AvailableData {
|
||||||
pov,
|
pov,
|
||||||
@@ -383,7 +384,7 @@ struct BackgroundValidationParams<F> {
|
|||||||
pov: Option<Arc<PoV>>,
|
pov: Option<Arc<PoV>>,
|
||||||
validator_index: Option<ValidatorIndex>,
|
validator_index: Option<ValidatorIndex>,
|
||||||
n_validators: usize,
|
n_validators: usize,
|
||||||
span: Option<JaegerSpan>,
|
span: Option<jaeger::Span>,
|
||||||
make_command: F,
|
make_command: F,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -415,7 +416,11 @@ async fn validate_and_make_available(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let v = {
|
let v = {
|
||||||
let _span = span.as_ref().map(|s| s.child("request-validation"));
|
let _span = span.as_ref().map(|s| {
|
||||||
|
s.child_builder("request-validation")
|
||||||
|
.with_pov(&pov)
|
||||||
|
.build()
|
||||||
|
});
|
||||||
request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
|
request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -513,7 +518,7 @@ impl CandidateBackingJob {
|
|||||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||||
async fn handle_validated_candidate_command(
|
async fn handle_validated_candidate_command(
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_span: &JaegerSpan,
|
parent_span: &jaeger::Span,
|
||||||
command: ValidatedCandidateCommand,
|
command: ValidatedCandidateCommand,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let candidate_hash = command.candidate_hash();
|
let candidate_hash = command.candidate_hash();
|
||||||
@@ -609,7 +614,7 @@ impl CandidateBackingJob {
|
|||||||
#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
|
||||||
async fn validate_and_second(
|
async fn validate_and_second(
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_span: &JaegerSpan,
|
parent_span: &jaeger::Span,
|
||||||
candidate: &CandidateReceipt,
|
candidate: &CandidateReceipt,
|
||||||
pov: Arc<PoV>,
|
pov: Arc<PoV>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
@@ -649,7 +654,7 @@ impl CandidateBackingJob {
|
|||||||
async fn sign_import_and_distribute_statement(
|
async fn sign_import_and_distribute_statement(
|
||||||
&mut self,
|
&mut self,
|
||||||
statement: Statement,
|
statement: Statement,
|
||||||
parent_span: &JaegerSpan,
|
parent_span: &jaeger::Span,
|
||||||
) -> Result<Option<SignedFullStatement>, Error> {
|
) -> Result<Option<SignedFullStatement>, Error> {
|
||||||
if let Some(signed_statement) = self.sign_statement(statement).await {
|
if let Some(signed_statement) = self.sign_statement(statement).await {
|
||||||
self.import_statement(&signed_statement, parent_span).await?;
|
self.import_statement(&signed_statement, parent_span).await?;
|
||||||
@@ -682,7 +687,7 @@ impl CandidateBackingJob {
|
|||||||
async fn import_statement(
|
async fn import_statement(
|
||||||
&mut self,
|
&mut self,
|
||||||
statement: &SignedFullStatement,
|
statement: &SignedFullStatement,
|
||||||
parent_span: &JaegerSpan,
|
parent_span: &jaeger::Span,
|
||||||
) -> Result<Option<TableSummary>, Error> {
|
) -> Result<Option<TableSummary>, Error> {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
target: LOG_TARGET,
|
target: LOG_TARGET,
|
||||||
@@ -745,11 +750,18 @@ impl CandidateBackingJob {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
|
||||||
async fn process_msg(&mut self, span: &JaegerSpan, msg: CandidateBackingMessage) -> Result<(), Error> {
|
async fn process_msg(&mut self, span: &jaeger::Span, msg: CandidateBackingMessage) -> Result<(), Error> {
|
||||||
match msg {
|
match msg {
|
||||||
CandidateBackingMessage::Second(_, candidate, pov) => {
|
CandidateBackingMessage::Second(_relay_parent, candidate, pov) => {
|
||||||
let _timer = self.metrics.time_process_second();
|
let _timer = self.metrics.time_process_second();
|
||||||
|
|
||||||
|
let span = span.child_builder("second")
|
||||||
|
.with_stage(jaeger::Stage::CandidateBacking)
|
||||||
|
.with_pov(&pov)
|
||||||
|
.with_candidate(&candidate.hash())
|
||||||
|
.with_relay_parent(&_relay_parent)
|
||||||
|
.build();
|
||||||
|
|
||||||
// Sanity check that candidate is from our assignment.
|
// Sanity check that candidate is from our assignment.
|
||||||
if Some(candidate.descriptor().para_id) != self.assignment {
|
if Some(candidate.descriptor().para_id) != self.assignment {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -768,8 +780,13 @@ impl CandidateBackingJob {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
CandidateBackingMessage::Statement(_, statement) => {
|
CandidateBackingMessage::Statement(_relay_parent, statement) => {
|
||||||
let _timer = self.metrics.time_process_statement();
|
let _timer = self.metrics.time_process_statement();
|
||||||
|
let span = span.child_builder("statement")
|
||||||
|
.with_stage(jaeger::Stage::CandidateBacking)
|
||||||
|
.with_candidate(&statement.payload().candidate_hash())
|
||||||
|
.with_relay_parent(&_relay_parent)
|
||||||
|
.build();
|
||||||
|
|
||||||
self.check_statement_signature(&statement)?;
|
self.check_statement_signature(&statement)?;
|
||||||
match self.maybe_validate_and_import(&span, statement).await {
|
match self.maybe_validate_and_import(&span, statement).await {
|
||||||
@@ -801,7 +818,7 @@ impl CandidateBackingJob {
|
|||||||
async fn kick_off_validation_work(
|
async fn kick_off_validation_work(
|
||||||
&mut self,
|
&mut self,
|
||||||
summary: TableSummary,
|
summary: TableSummary,
|
||||||
span: Option<JaegerSpan>,
|
span: Option<jaeger::Span>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let candidate_hash = summary.candidate;
|
let candidate_hash = summary.candidate;
|
||||||
|
|
||||||
@@ -851,7 +868,7 @@ impl CandidateBackingJob {
|
|||||||
#[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(level = "trace", skip(self, parent_span), fields(subsystem = LOG_TARGET))]
|
||||||
async fn maybe_validate_and_import(
|
async fn maybe_validate_and_import(
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_span: &JaegerSpan,
|
parent_span: &jaeger::Span,
|
||||||
statement: SignedFullStatement,
|
statement: SignedFullStatement,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
if let Some(summary) = self.import_statement(&statement, parent_span).await? {
|
if let Some(summary) = self.import_statement(&statement, parent_span).await? {
|
||||||
@@ -896,7 +913,7 @@ impl CandidateBackingJob {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Insert or get the unbacked-span for the given candidate hash.
|
/// Insert or get the unbacked-span for the given candidate hash.
|
||||||
fn insert_or_get_unbacked_span(&mut self, parent_span: &JaegerSpan, hash: CandidateHash) -> Option<&JaegerSpan> {
|
fn insert_or_get_unbacked_span(&mut self, parent_span: &jaeger::Span, hash: CandidateHash) -> Option<&jaeger::Span> {
|
||||||
if !self.backed.contains(&hash) {
|
if !self.backed.contains(&hash) {
|
||||||
// only add if we don't consider this backed.
|
// only add if we don't consider this backed.
|
||||||
let span = self.unbacked_candidates.entry(hash).or_insert_with(|| {
|
let span = self.unbacked_candidates.entry(hash).or_insert_with(|| {
|
||||||
@@ -908,16 +925,22 @@ impl CandidateBackingJob {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_unbacked_validation_child(&mut self, parent_span: &JaegerSpan, hash: CandidateHash) -> Option<JaegerSpan> {
|
fn get_unbacked_validation_child(&mut self, parent_span: &jaeger::Span, hash: CandidateHash) -> Option<jaeger::Span> {
|
||||||
self.insert_or_get_unbacked_span(parent_span, hash).map(|span| span.child_with_candidate("validation", &hash))
|
self.insert_or_get_unbacked_span(parent_span, hash)
|
||||||
|
.map(|span| {
|
||||||
|
span.child_builder("validation")
|
||||||
|
.with_candidate(&hash)
|
||||||
|
.with_stage(Stage::CandidateBacking)
|
||||||
|
.build()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_unbacked_statement_child(
|
fn get_unbacked_statement_child(
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_span: &JaegerSpan,
|
parent_span: &jaeger::Span,
|
||||||
hash: CandidateHash,
|
hash: CandidateHash,
|
||||||
validator: ValidatorIndex,
|
validator: ValidatorIndex,
|
||||||
) -> Option<JaegerSpan> {
|
) -> Option<jaeger::Span> {
|
||||||
self.insert_or_get_unbacked_span(parent_span, hash).map(|span| {
|
self.insert_or_get_unbacked_span(parent_span, hash).map(|span| {
|
||||||
span.child_builder("import-statement")
|
span.child_builder("import-statement")
|
||||||
.with_candidate(&hash)
|
.with_candidate(&hash)
|
||||||
@@ -926,7 +949,7 @@ impl CandidateBackingJob {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_unbacked_span(&mut self, hash: &CandidateHash) -> Option<JaegerSpan> {
|
fn remove_unbacked_span(&mut self, hash: &CandidateHash) -> Option<jaeger::Span> {
|
||||||
self.unbacked_candidates.remove(hash)
|
self.unbacked_candidates.remove(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -966,7 +989,7 @@ impl util::JobTrait for CandidateBackingJob {
|
|||||||
#[tracing::instrument(skip(span, keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(skip(span, keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
|
||||||
fn run(
|
fn run(
|
||||||
parent: Hash,
|
parent: Hash,
|
||||||
span: Arc<JaegerSpan>,
|
span: Arc<jaeger::Span>,
|
||||||
keystore: SyncCryptoStorePtr,
|
keystore: SyncCryptoStorePtr,
|
||||||
metrics: Metrics,
|
metrics: Metrics,
|
||||||
rx_to: mpsc::Receiver<Self::ToJob>,
|
rx_to: mpsc::Receiver<Self::ToJob>,
|
||||||
@@ -1379,7 +1402,7 @@ mod tests {
|
|||||||
virtual_overseer.send(FromOverseer::Signal(
|
virtual_overseer.send(FromOverseer::Signal(
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
|
||||||
test_state.relay_parent,
|
test_state.relay_parent,
|
||||||
Arc::new(JaegerSpan::Disabled),
|
Arc::new(jaeger::Span::Disabled),
|
||||||
)))
|
)))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,7 @@
|
|||||||
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
|
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
|
||||||
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
|
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
|
||||||
use polkadot_node_subsystem::{
|
use polkadot_node_subsystem::{
|
||||||
jaeger, PerLeafSpan, JaegerSpan,
|
jaeger, PerLeafSpan,
|
||||||
messages::{
|
messages::{
|
||||||
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
|
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
|
||||||
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
|
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
|
||||||
@@ -75,7 +75,7 @@ async fn get_core_availability(
|
|||||||
core: CoreState,
|
core: CoreState,
|
||||||
validator_idx: ValidatorIndex,
|
validator_idx: ValidatorIndex,
|
||||||
sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>,
|
sender: &Mutex<&mut mpsc::Sender<FromJobCommand>>,
|
||||||
span: &jaeger::JaegerSpan,
|
span: &jaeger::Span,
|
||||||
) -> Result<bool, Error> {
|
) -> Result<bool, Error> {
|
||||||
if let CoreState::Occupied(core) = core {
|
if let CoreState::Occupied(core) = core {
|
||||||
let _span = span.child("query-chunk-availability");
|
let _span = span.child("query-chunk-availability");
|
||||||
@@ -132,7 +132,7 @@ async fn get_availability_cores(
|
|||||||
#[tracing::instrument(level = "trace", skip(sender, span), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(level = "trace", skip(sender, span), fields(subsystem = LOG_TARGET))]
|
||||||
async fn construct_availability_bitfield(
|
async fn construct_availability_bitfield(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
span: &jaeger::JaegerSpan,
|
span: &jaeger::Span,
|
||||||
validator_idx: ValidatorIndex,
|
validator_idx: ValidatorIndex,
|
||||||
sender: &mut mpsc::Sender<FromJobCommand>,
|
sender: &mut mpsc::Sender<FromJobCommand>,
|
||||||
) -> Result<AvailabilityBitfield, Error> {
|
) -> Result<AvailabilityBitfield, Error> {
|
||||||
@@ -218,7 +218,7 @@ impl JobTrait for BitfieldSigningJob {
|
|||||||
#[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
|
||||||
fn run(
|
fn run(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
span: Arc<JaegerSpan>,
|
span: Arc<jaeger::Span>,
|
||||||
keystore: Self::RunArgs,
|
keystore: Self::RunArgs,
|
||||||
metrics: Self::Metrics,
|
metrics: Self::Metrics,
|
||||||
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
|
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
|
||||||
@@ -321,7 +321,7 @@ mod tests {
|
|||||||
|
|
||||||
let future = construct_availability_bitfield(
|
let future = construct_availability_bitfield(
|
||||||
relay_parent,
|
relay_parent,
|
||||||
&jaeger::JaegerSpan::Disabled,
|
&jaeger::Span::Disabled,
|
||||||
validator_index,
|
validator_index,
|
||||||
&mut sender,
|
&mut sender,
|
||||||
).fuse();
|
).fuse();
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ use futures::{
|
|||||||
};
|
};
|
||||||
use sp_keystore::SyncCryptoStorePtr;
|
use sp_keystore::SyncCryptoStorePtr;
|
||||||
use polkadot_node_subsystem::{
|
use polkadot_node_subsystem::{
|
||||||
jaeger, JaegerSpan, PerLeafSpan,
|
jaeger, PerLeafSpan,
|
||||||
errors::ChainApiError,
|
errors::ChainApiError,
|
||||||
messages::{
|
messages::{
|
||||||
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
|
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
|
||||||
@@ -96,7 +96,7 @@ impl JobTrait for CandidateSelectionJob {
|
|||||||
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
|
||||||
fn run(
|
fn run(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
span: Arc<JaegerSpan>,
|
span: Arc<jaeger::Span>,
|
||||||
keystore: Self::RunArgs,
|
keystore: Self::RunArgs,
|
||||||
metrics: Self::Metrics,
|
metrics: Self::Metrics,
|
||||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||||
@@ -104,7 +104,10 @@ impl JobTrait for CandidateSelectionJob {
|
|||||||
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
|
||||||
let span = PerLeafSpan::new(span, "candidate-selection");
|
let span = PerLeafSpan::new(span, "candidate-selection");
|
||||||
async move {
|
async move {
|
||||||
let _span = span.child("query-runtime");
|
let _span = span.child_builder("query-runtime")
|
||||||
|
.with_relay_parent(&relay_parent)
|
||||||
|
.with_stage(jaeger::Stage::CandidateSelection)
|
||||||
|
.build();
|
||||||
let (groups, cores) = futures::try_join!(
|
let (groups, cores) = futures::try_join!(
|
||||||
try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await),
|
try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await),
|
||||||
try_runtime_api!(request_from_runtime(
|
try_runtime_api!(request_from_runtime(
|
||||||
@@ -118,7 +121,10 @@ impl JobTrait for CandidateSelectionJob {
|
|||||||
let cores = try_runtime_api!(cores);
|
let cores = try_runtime_api!(cores);
|
||||||
|
|
||||||
drop(_span);
|
drop(_span);
|
||||||
let _span = span.child("find-assignment");
|
let _span = span.child_builder("find-assignment")
|
||||||
|
.with_relay_parent(&relay_parent)
|
||||||
|
.with_stage(jaeger::Stage::CandidateSelection)
|
||||||
|
.build();
|
||||||
|
|
||||||
let n_cores = cores.len();
|
let n_cores = cores.len();
|
||||||
|
|
||||||
@@ -172,8 +178,11 @@ impl CandidateSelectionJob {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_loop(&mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> {
|
async fn run_loop(&mut self, span: &jaeger::Span) -> Result<(), Error> {
|
||||||
let span = span.child("run-loop");
|
let span = span.child_builder("run-loop")
|
||||||
|
.with_stage(jaeger::Stage::CandidateSelection)
|
||||||
|
.build();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match self.receiver.next().await {
|
match self.receiver.next().await {
|
||||||
Some(CandidateSelectionMessage::Collation(
|
Some(CandidateSelectionMessage::Collation(
|
||||||
@@ -185,14 +194,22 @@ impl CandidateSelectionJob {
|
|||||||
self.handle_collation(relay_parent, para_id, collator_id).await;
|
self.handle_collation(relay_parent, para_id, collator_id).await;
|
||||||
}
|
}
|
||||||
Some(CandidateSelectionMessage::Invalid(
|
Some(CandidateSelectionMessage::Invalid(
|
||||||
_,
|
_relay_parent,
|
||||||
candidate_receipt,
|
candidate_receipt,
|
||||||
)) => {
|
)) => {
|
||||||
let _span = span.child("handle-invalid");
|
let _span = span.child_builder("handle-invalid")
|
||||||
|
.with_stage(jaeger::Stage::CandidateSelection)
|
||||||
|
.with_candidate(&candidate_receipt.hash())
|
||||||
|
.with_relay_parent(&_relay_parent)
|
||||||
|
.build();
|
||||||
self.handle_invalid(candidate_receipt).await;
|
self.handle_invalid(candidate_receipt).await;
|
||||||
}
|
}
|
||||||
Some(CandidateSelectionMessage::Seconded(_, statement)) => {
|
Some(CandidateSelectionMessage::Seconded(_relay_parent, statement)) => {
|
||||||
let _span = span.child("handle-seconded");
|
let _span = span.child_builder("handle-seconded")
|
||||||
|
.with_stage(jaeger::Stage::CandidateSelection)
|
||||||
|
.with_candidate(&statement.payload().candidate_hash())
|
||||||
|
.with_relay_parent(&_relay_parent)
|
||||||
|
.build();
|
||||||
self.handle_seconded(statement).await;
|
self.handle_seconded(statement).await;
|
||||||
}
|
}
|
||||||
None => break,
|
None => break,
|
||||||
@@ -514,7 +531,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
|
|
||||||
preconditions(&mut job);
|
preconditions(&mut job);
|
||||||
let span = jaeger::JaegerSpan::Disabled;
|
let span = jaeger::Span::Disabled;
|
||||||
let (_, job_result) = futures::executor::block_on(future::join(
|
let (_, job_result) = futures::executor::block_on(future::join(
|
||||||
test(to_job_tx, from_job_rx),
|
test(to_job_tx, from_job_rx),
|
||||||
job.run_loop(&span),
|
job.run_loop(&span),
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ use futures::{
|
|||||||
prelude::*,
|
prelude::*,
|
||||||
};
|
};
|
||||||
use polkadot_node_subsystem::{
|
use polkadot_node_subsystem::{
|
||||||
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, JaegerSpan,
|
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, jaeger,
|
||||||
messages::{
|
messages::{
|
||||||
AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
|
AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
|
||||||
ProvisionerMessage,
|
ProvisionerMessage,
|
||||||
@@ -141,7 +141,7 @@ impl JobTrait for ProvisioningJob {
|
|||||||
#[tracing::instrument(skip(span, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(skip(span, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
|
||||||
fn run(
|
fn run(
|
||||||
relay_parent: Hash,
|
relay_parent: Hash,
|
||||||
span: Arc<JaegerSpan>,
|
span: Arc<jaeger::Span>,
|
||||||
_run_args: Self::RunArgs,
|
_run_args: Self::RunArgs,
|
||||||
metrics: Self::Metrics,
|
metrics: Self::Metrics,
|
||||||
receiver: mpsc::Receiver<ProvisionerMessage>,
|
receiver: mpsc::Receiver<ProvisionerMessage>,
|
||||||
|
|||||||
@@ -125,8 +125,8 @@ impl JaegerConfigBuilder {
|
|||||||
/// This just works as auxiliary structure to easily store both.
|
/// This just works as auxiliary structure to easily store both.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PerLeafSpan {
|
pub struct PerLeafSpan {
|
||||||
leaf_span: Arc<JaegerSpan>,
|
leaf_span: Arc<Span>,
|
||||||
span: JaegerSpan,
|
span: Span,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PerLeafSpan {
|
impl PerLeafSpan {
|
||||||
@@ -135,7 +135,7 @@ impl PerLeafSpan {
|
|||||||
/// Takes the `leaf_span` that is created by the overseer per leaf and a name for a child span.
|
/// Takes the `leaf_span` that is created by the overseer per leaf and a name for a child span.
|
||||||
/// Both will be stored in this object, while the child span is implicitly accessible by using the
|
/// Both will be stored in this object, while the child span is implicitly accessible by using the
|
||||||
/// [`Deref`](std::ops::Deref) implementation.
|
/// [`Deref`](std::ops::Deref) implementation.
|
||||||
pub fn new(leaf_span: Arc<JaegerSpan>, name: &'static str) -> Self {
|
pub fn new(leaf_span: Arc<Span>, name: &'static str) -> Self {
|
||||||
let span = leaf_span.child(name);
|
let span = leaf_span.child(name);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
@@ -145,16 +145,16 @@ impl PerLeafSpan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the leaf span.
|
/// Returns the leaf span.
|
||||||
pub fn leaf_span(&self) -> &Arc<JaegerSpan> {
|
pub fn leaf_span(&self) -> &Arc<Span> {
|
||||||
&self.leaf_span
|
&self.leaf_span
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a reference to the child span.
|
/// Returns a reference to the child span.
|
||||||
impl std::ops::Deref for PerLeafSpan {
|
impl std::ops::Deref for PerLeafSpan {
|
||||||
type Target = JaegerSpan;
|
type Target = Span;
|
||||||
|
|
||||||
fn deref(&self) -> &JaegerSpan {
|
fn deref(&self) -> &Span {
|
||||||
&self.span
|
&self.span
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -169,15 +169,24 @@ impl std::ops::Deref for PerLeafSpan {
|
|||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
#[non_exhaustive]
|
#[non_exhaustive]
|
||||||
pub enum Stage {
|
pub enum Stage {
|
||||||
Backing = 1,
|
CandidateSelection = 1,
|
||||||
Availability = 2,
|
CandidateBacking = 2,
|
||||||
// TODO expand this
|
StatementDistribution = 3,
|
||||||
|
PoVDistribution = 4,
|
||||||
|
AvailabilityDistribution = 5,
|
||||||
|
AvailabilityRecovery = 6,
|
||||||
|
BitfieldDistribution = 7,
|
||||||
|
// Expand as needed, numbers should be ascending according to the stage
|
||||||
|
// through the inclusion pipeline, or according to the descriptions
|
||||||
|
// in [the path of a para chain block]
|
||||||
|
// (https://polkadot.network/the-path-of-a-parachain-block/)
|
||||||
|
// see [issue](https://github.com/paritytech/polkadot/issues/2389)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builder pattern for children and root spans to unify
|
/// Builder pattern for children and root spans to unify
|
||||||
/// information annotations.
|
/// information annotations.
|
||||||
pub struct SpanBuilder {
|
pub struct SpanBuilder {
|
||||||
span: JaegerSpan,
|
span: Span,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SpanBuilder {
|
impl SpanBuilder {
|
||||||
@@ -196,8 +205,8 @@ impl SpanBuilder {
|
|||||||
/// Attach a candidate stage.
|
/// Attach a candidate stage.
|
||||||
/// Should always come with a `CandidateHash`.
|
/// Should always come with a `CandidateHash`.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn with_candidate_stage(mut self, stage: Stage) -> Self {
|
pub fn with_stage(mut self, stage: Stage) -> Self {
|
||||||
self.span.add_string_tag("candidate-stage", &format!("{}", stage as u8));
|
self.span.add_stage(stage);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,9 +237,15 @@ impl SpanBuilder {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn with_pov(mut self, pov: &PoV) -> Self {
|
||||||
|
self.span.add_pov(pov);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Complete construction.
|
/// Complete construction.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn build(self) -> JaegerSpan {
|
pub fn build(self) -> Span {
|
||||||
self.span
|
self.span
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -239,14 +254,14 @@ impl SpanBuilder {
|
|||||||
/// A wrapper type for a span.
|
/// A wrapper type for a span.
|
||||||
///
|
///
|
||||||
/// Handles running with and without jaeger.
|
/// Handles running with and without jaeger.
|
||||||
pub enum JaegerSpan {
|
pub enum Span {
|
||||||
/// Running with jaeger being enabled.
|
/// Running with jaeger being enabled.
|
||||||
Enabled(mick_jaeger::Span),
|
Enabled(mick_jaeger::Span),
|
||||||
/// Running with jaeger disabled.
|
/// Running with jaeger disabled.
|
||||||
Disabled,
|
Disabled,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JaegerSpan {
|
impl Span {
|
||||||
/// Derive a child span from `self`.
|
/// Derive a child span from `self`.
|
||||||
pub fn child(&self, name: &'static str) -> Self {
|
pub fn child(&self, name: &'static str) -> Self {
|
||||||
match self {
|
match self {
|
||||||
@@ -276,6 +291,18 @@ impl JaegerSpan {
|
|||||||
self.child_builder(name).with_candidate(candidate_hash).build()
|
self.child_builder(name).with_candidate(candidate_hash).build()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add candidate stage annotation.
|
||||||
|
pub fn add_stage(&mut self, stage: Stage) {
|
||||||
|
self.add_string_tag("candidate-stage", &format!("{}", stage as u8));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_pov(&mut self, pov: &PoV) {
|
||||||
|
if self.is_enabled() {
|
||||||
|
// avoid computing the pov hash if jaeger is not enabled
|
||||||
|
self.add_string_tag("pov", &format!("{:?}", pov.hash()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Add an additional tag to the span.
|
/// Add an additional tag to the span.
|
||||||
pub fn add_string_tag(&mut self, tag: &str, value: &str) {
|
pub fn add_string_tag(&mut self, tag: &str, value: &str) {
|
||||||
match self {
|
match self {
|
||||||
@@ -291,15 +318,24 @@ impl JaegerSpan {
|
|||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Helper to check whether jaeger is enabled
|
||||||
|
/// in order to avoid computational overhead.
|
||||||
|
pub const fn is_enabled(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
Span::Enabled(_) => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for JaegerSpan {
|
impl std::fmt::Debug for Span {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
write!(f, "<jaeger span>")
|
write!(f, "<jaeger span>")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Option<mick_jaeger::Span>> for JaegerSpan {
|
impl From<Option<mick_jaeger::Span>> for Span {
|
||||||
fn from(src: Option<mick_jaeger::Span>) -> Self {
|
fn from(src: Option<mick_jaeger::Span>) -> Self {
|
||||||
if let Some(span) = src {
|
if let Some(span) = src {
|
||||||
Self::Enabled(span)
|
Self::Enabled(span)
|
||||||
@@ -309,15 +345,15 @@ impl From<Option<mick_jaeger::Span>> for JaegerSpan {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<mick_jaeger::Span> for JaegerSpan {
|
impl From<mick_jaeger::Span> for Span {
|
||||||
fn from(src: mick_jaeger::Span) -> Self {
|
fn from(src: mick_jaeger::Span) -> Self {
|
||||||
Self::Enabled(src)
|
Self::Enabled(src)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shortcut for [`hash_span`] with the hash of the `Candidate` block.
|
/// Shortcut for [`hash_span`] with the hash of the `Candidate` block.
|
||||||
pub fn candidate_hash_span(candidate_hash: &CandidateHash, span_name: &'static str) -> JaegerSpan {
|
pub fn candidate_hash_span(candidate_hash: &CandidateHash, span_name: &'static str) -> Span {
|
||||||
let mut span: JaegerSpan = INSTANCE.read_recursive()
|
let mut span: Span = INSTANCE.read_recursive()
|
||||||
.span(|| { candidate_hash.0 }, span_name).into();
|
.span(|| { candidate_hash.0 }, span_name).into();
|
||||||
|
|
||||||
span.add_string_tag("candidate-hash", &format!("{:?}", candidate_hash.0));
|
span.add_string_tag("candidate-hash", &format!("{:?}", candidate_hash.0));
|
||||||
@@ -326,8 +362,12 @@ pub fn candidate_hash_span(candidate_hash: &CandidateHash, span_name: &'static s
|
|||||||
|
|
||||||
/// Shortcut for [`hash_span`] with the hash of the `PoV`.
|
/// Shortcut for [`hash_span`] with the hash of the `PoV`.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn pov_span(pov: &PoV, span_name: &'static str) -> JaegerSpan {
|
pub fn pov_span(pov: &PoV, span_name: &'static str) -> Span {
|
||||||
INSTANCE.read_recursive().span(|| { pov.hash() }, span_name).into()
|
let mut span: Span = INSTANCE.read_recursive()
|
||||||
|
.span(|| { pov.hash() }, span_name).into();
|
||||||
|
|
||||||
|
span.add_pov(pov);
|
||||||
|
span
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a `Span` referring to the given hash. All spans created with [`hash_span`] with the
|
/// Creates a `Span` referring to the given hash. All spans created with [`hash_span`] with the
|
||||||
@@ -335,8 +375,8 @@ pub fn pov_span(pov: &PoV, span_name: &'static str) -> JaegerSpan {
|
|||||||
///
|
///
|
||||||
/// This span automatically has the `relay-parent` tag set.
|
/// This span automatically has the `relay-parent` tag set.
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub fn hash_span(hash: &Hash, span_name: &'static str) -> JaegerSpan {
|
pub fn hash_span(hash: &Hash, span_name: &'static str) -> Span {
|
||||||
let mut span: JaegerSpan = INSTANCE.read_recursive().span(|| { *hash }, span_name).into();
|
let mut span: Span = INSTANCE.read_recursive().span(|| { *hash }, span_name).into();
|
||||||
span.add_string_tag("relay-parent", &format!("{:?}", hash));
|
span.add_string_tag("relay-parent", &format!("{:?}", hash));
|
||||||
span
|
span
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ use polkadot_subsystem::messages::{
|
|||||||
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent
|
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent
|
||||||
};
|
};
|
||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
jaeger, errors::{ChainApiError, RuntimeApiError}, PerLeafSpan,
|
jaeger, errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, Stage,
|
||||||
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
|
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
|
||||||
};
|
};
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
@@ -166,7 +166,7 @@ struct PerCandidate {
|
|||||||
live_in: HashSet<Hash>,
|
live_in: HashSet<Hash>,
|
||||||
|
|
||||||
/// A Jaeger span relating to this candidate.
|
/// A Jaeger span relating to this candidate.
|
||||||
span: jaeger::JaegerSpan,
|
span: jaeger::Span,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PerCandidate {
|
impl PerCandidate {
|
||||||
@@ -185,7 +185,7 @@ impl PerCandidate {
|
|||||||
fn drop_span_after_own_availability(&mut self) {
|
fn drop_span_after_own_availability(&mut self) {
|
||||||
if let Some(validator_index) = self.validator_index {
|
if let Some(validator_index) = self.validator_index {
|
||||||
if self.message_vault.contains_key(&validator_index) {
|
if self.message_vault.contains_key(&validator_index) {
|
||||||
self.span = jaeger::JaegerSpan::Disabled;
|
self.span = jaeger::Span::Disabled;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -251,7 +251,7 @@ impl ProtocolState {
|
|||||||
span: if validator_index.is_some() {
|
span: if validator_index.is_some() {
|
||||||
jaeger::candidate_hash_span(&receipt_hash, "pending-availability")
|
jaeger::candidate_hash_span(&receipt_hash, "pending-availability")
|
||||||
} else {
|
} else {
|
||||||
jaeger::JaegerSpan::Disabled
|
jaeger::Span::Disabled
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
@@ -262,7 +262,10 @@ impl ProtocolState {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Create some span that will make it able to switch between the candidate and relay parent span.
|
// Create some span that will make it able to switch between the candidate and relay parent span.
|
||||||
let span = per_relay_parent.span.child_with_candidate("live-candidate", &receipt_hash);
|
let span = per_relay_parent.span.child_builder("live-candidate")
|
||||||
|
.with_candidate(&receipt_hash)
|
||||||
|
.with_stage(Stage::AvailabilityDistribution)
|
||||||
|
.build();
|
||||||
candidate_entry.span.add_follows_from(&span);
|
candidate_entry.span.add_follows_from(&span);
|
||||||
candidate_entry.live_in.insert(relay_parent);
|
candidate_entry.live_in.insert(relay_parent);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ fn make_per_candidate() -> PerCandidate {
|
|||||||
validators: Vec::new(),
|
validators: Vec::new(),
|
||||||
validator_index: None,
|
validator_index: None,
|
||||||
descriptor: Default::default(),
|
descriptor: Default::default(),
|
||||||
span: jaeger::JaegerSpan::Disabled,
|
span: jaeger::Span::Disabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1018,13 +1018,13 @@ fn clean_up_receipts_cache_unions_ancestors_and_view() {
|
|||||||
state.per_relay_parent.insert(hash_a, PerRelayParent {
|
state.per_relay_parent.insert(hash_a, PerRelayParent {
|
||||||
ancestors: vec![hash_b],
|
ancestors: vec![hash_b],
|
||||||
live_candidates: HashSet::new(),
|
live_candidates: HashSet::new(),
|
||||||
span: PerLeafSpan::new(Arc::new(jaeger::JaegerSpan::Disabled), "test"),
|
span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
});
|
});
|
||||||
|
|
||||||
state.per_relay_parent.insert(hash_c, PerRelayParent {
|
state.per_relay_parent.insert(hash_c, PerRelayParent {
|
||||||
ancestors: Vec::new(),
|
ancestors: Vec::new(),
|
||||||
live_candidates: HashSet::new(),
|
live_candidates: HashSet::new(),
|
||||||
span: PerLeafSpan::new(Arc::new(jaeger::JaegerSpan::Disabled), "test"),
|
span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
});
|
});
|
||||||
|
|
||||||
state.clean_up_live_under_cache();
|
state.clean_up_live_under_cache();
|
||||||
@@ -1048,13 +1048,13 @@ fn remove_relay_parent_only_removes_per_candidate_if_final() {
|
|||||||
state.per_relay_parent.insert(hash_a, PerRelayParent {
|
state.per_relay_parent.insert(hash_a, PerRelayParent {
|
||||||
ancestors: vec![],
|
ancestors: vec![],
|
||||||
live_candidates: std::iter::once(candidate_hash_a).collect(),
|
live_candidates: std::iter::once(candidate_hash_a).collect(),
|
||||||
span: PerLeafSpan::new(Arc::new(jaeger::JaegerSpan::Disabled), "test"),
|
span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
});
|
});
|
||||||
|
|
||||||
state.per_relay_parent.insert(hash_b, PerRelayParent {
|
state.per_relay_parent.insert(hash_b, PerRelayParent {
|
||||||
ancestors: vec![],
|
ancestors: vec![],
|
||||||
live_candidates: std::iter::once(candidate_hash_a).collect(),
|
live_candidates: std::iter::once(candidate_hash_a).collect(),
|
||||||
span: PerLeafSpan::new(Arc::new(jaeger::JaegerSpan::Disabled), "test"),
|
span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
});
|
});
|
||||||
|
|
||||||
state.per_candidate.insert(candidate_hash_a, {
|
state.per_candidate.insert(candidate_hash_a, {
|
||||||
@@ -1099,7 +1099,7 @@ fn add_relay_parent_includes_all_live_candidates() {
|
|||||||
None,
|
None,
|
||||||
candidates,
|
candidates,
|
||||||
vec![ancestor_a],
|
vec![ancestor_a],
|
||||||
PerLeafSpan::new(Arc::new(jaeger::JaegerSpan::Disabled), "test"),
|
PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
);
|
);
|
||||||
|
|
||||||
assert!(
|
assert!(
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ use polkadot_subsystem::{
|
|||||||
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
|
SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
|
||||||
OverseerSignal, ActiveLeavesUpdate,
|
OverseerSignal, ActiveLeavesUpdate,
|
||||||
errors::RecoveryError,
|
errors::RecoveryError,
|
||||||
|
jaeger,
|
||||||
messages::{
|
messages::{
|
||||||
AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
|
AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
|
||||||
NetworkBridgeEvent,
|
NetworkBridgeEvent,
|
||||||
@@ -664,6 +665,9 @@ async fn handle_recover(
|
|||||||
) -> error::Result<()> {
|
) -> error::Result<()> {
|
||||||
let candidate_hash = receipt.hash();
|
let candidate_hash = receipt.hash();
|
||||||
|
|
||||||
|
let mut span = jaeger::candidate_hash_span(&candidate_hash, "availbility-recovery");
|
||||||
|
span.add_stage(jaeger::Stage::AvailabilityRecovery);
|
||||||
|
|
||||||
if let Some(result) = state.availability_lru.get(&candidate_hash) {
|
if let Some(result) = state.availability_lru.get(&candidate_hash) {
|
||||||
if let Err(e) = response_sender.send(result.clone()) {
|
if let Err(e) = response_sender.send(result.clone()) {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
@@ -680,12 +684,14 @@ async fn handle_recover(
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let _span = span.child("not-cached");
|
||||||
let session_info = request_session_info_ctx(
|
let session_info = request_session_info_ctx(
|
||||||
state.live_block_hash,
|
state.live_block_hash,
|
||||||
session_index,
|
session_index,
|
||||||
ctx,
|
ctx,
|
||||||
).await?.await.map_err(error::Error::CanceledSessionInfo)??;
|
).await?.await.map_err(error::Error::CanceledSessionInfo)??;
|
||||||
|
|
||||||
|
let _span = span.child("session-info-ctx-received");
|
||||||
match session_info {
|
match session_info {
|
||||||
Some(session_info) => {
|
Some(session_info) => {
|
||||||
launch_interaction(
|
launch_interaction(
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ use polkadot_primitives::v1::{
|
|||||||
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
|
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
|
||||||
use polkadot_node_subsystem_util::TimeoutExt;
|
use polkadot_node_subsystem_util::TimeoutExt;
|
||||||
use polkadot_subsystem_testhelpers as test_helpers;
|
use polkadot_subsystem_testhelpers as test_helpers;
|
||||||
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent}, JaegerSpan};
|
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent}, jaeger};
|
||||||
|
|
||||||
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;
|
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;
|
||||||
|
|
||||||
@@ -537,7 +537,7 @@ fn availability_is_recovered_from_chunks_if_no_group_provided() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -601,7 +601,7 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -664,7 +664,7 @@ fn bad_merkle_path_leads_to_recovery_error() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -719,7 +719,7 @@ fn wrong_chunk_index_leads_to_recovery_error() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -790,7 +790,7 @@ fn invalid_erasure_coding_leads_to_invalid_error() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -831,7 +831,7 @@ fn fast_path_backing_group_recovers() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -876,7 +876,7 @@ fn wrong_data_from_fast_path_peer_leads_to_punishment() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -921,7 +921,7 @@ fn no_answers_in_fast_path_causes_chunk_requests() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: smallvec![(test_state.current.clone(), Arc::new(JaegerSpan::Disabled))],
|
activated: smallvec![(test_state.current.clone(), Arc::new(jaeger::Span::Disabled))],
|
||||||
deactivated: smallvec![],
|
deactivated: smallvec![],
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ use polkadot_subsystem::messages::*;
|
|||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
|
PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
|
||||||
SubsystemContext, SubsystemResult,
|
SubsystemContext, SubsystemResult,
|
||||||
|
jaeger,
|
||||||
};
|
};
|
||||||
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
use polkadot_node_subsystem_util::metrics::{self, prometheus};
|
||||||
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
|
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
|
||||||
@@ -402,6 +403,7 @@ where
|
|||||||
.child_builder("msg-received")
|
.child_builder("msg-received")
|
||||||
.with_peer_id(&origin)
|
.with_peer_id(&origin)
|
||||||
.with_claimed_validator_index(message.signed_availability.validator_index())
|
.with_claimed_validator_index(message.signed_availability.validator_index())
|
||||||
|
.with_stage(jaeger::Stage::BitfieldDistribution)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
let validator_set = &job_data.validator_set;
|
let validator_set = &job_data.validator_set;
|
||||||
@@ -775,7 +777,7 @@ mod test {
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
|
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
|
||||||
use polkadot_subsystem::JaegerSpan;
|
use polkadot_subsystem::jaeger;
|
||||||
|
|
||||||
macro_rules! launch {
|
macro_rules! launch {
|
||||||
($fut:expr) => {
|
($fut:expr) => {
|
||||||
@@ -807,7 +809,7 @@ mod test {
|
|||||||
},
|
},
|
||||||
message_received_from_peer: hashmap!{},
|
message_received_from_peer: hashmap!{},
|
||||||
message_sent_to_peer: hashmap!{},
|
message_sent_to_peer: hashmap!{},
|
||||||
span: PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
|
span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
peer_views: peers
|
peer_views: peers
|
||||||
@@ -841,7 +843,7 @@ mod test {
|
|||||||
one_per_validator: hashmap!{},
|
one_per_validator: hashmap!{},
|
||||||
message_received_from_peer: hashmap!{},
|
message_received_from_peer: hashmap!{},
|
||||||
message_sent_to_peer: hashmap!{},
|
message_sent_to_peer: hashmap!{},
|
||||||
span: PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
|
span: PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
})
|
})
|
||||||
}).collect();
|
}).collect();
|
||||||
|
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ use futures::prelude::*;
|
|||||||
|
|
||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
|
ActiveLeavesUpdate, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
|
||||||
SubsystemResult, JaegerSpan,
|
SubsystemResult, jaeger,
|
||||||
};
|
};
|
||||||
use polkadot_subsystem::messages::{
|
use polkadot_subsystem::messages::{
|
||||||
NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
|
NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
|
||||||
@@ -155,7 +155,7 @@ where
|
|||||||
let mut event_stream = bridge.network_service.event_stream().fuse();
|
let mut event_stream = bridge.network_service.event_stream().fuse();
|
||||||
|
|
||||||
// Most recent heads are at the back.
|
// Most recent heads are at the back.
|
||||||
let mut live_heads: Vec<(Hash, Arc<JaegerSpan>)> = Vec::with_capacity(MAX_VIEW_HEADS);
|
let mut live_heads: Vec<(Hash, Arc<jaeger::Span>)> = Vec::with_capacity(MAX_VIEW_HEADS);
|
||||||
let mut local_view = View::default();
|
let mut local_view = View::default();
|
||||||
let mut finalized_number = 0;
|
let mut finalized_number = 0;
|
||||||
|
|
||||||
@@ -419,7 +419,7 @@ fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_n
|
|||||||
async fn update_our_view(
|
async fn update_our_view(
|
||||||
net: &mut impl Network,
|
net: &mut impl Network,
|
||||||
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
|
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
|
||||||
live_heads: &[(Hash, Arc<JaegerSpan>)],
|
live_heads: &[(Hash, Arc<jaeger::Span>)],
|
||||||
local_view: &mut View,
|
local_view: &mut View,
|
||||||
finalized_number: BlockNumber,
|
finalized_number: BlockNumber,
|
||||||
validation_peers: &HashMap<PeerId, PeerData>,
|
validation_peers: &HashMap<PeerId, PeerData>,
|
||||||
@@ -873,7 +873,7 @@ mod tests {
|
|||||||
let head = Hash::repeat_byte(1);
|
let head = Hash::repeat_byte(1);
|
||||||
virtual_overseer.send(
|
virtual_overseer.send(
|
||||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||||
ActiveLeavesUpdate::start_work(head, Arc::new(JaegerSpan::Disabled)),
|
ActiveLeavesUpdate::start_work(head, Arc::new(jaeger::Span::Disabled)),
|
||||||
))
|
))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
@@ -928,7 +928,7 @@ mod tests {
|
|||||||
|
|
||||||
virtual_overseer.send(
|
virtual_overseer.send(
|
||||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||||
ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
|
ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
|
||||||
))
|
))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
@@ -990,7 +990,7 @@ mod tests {
|
|||||||
// This should trigger the view update to our peers.
|
// This should trigger the view update to our peers.
|
||||||
virtual_overseer.send(
|
virtual_overseer.send(
|
||||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||||
ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
|
ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
|
||||||
))
|
))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
@@ -1180,7 +1180,7 @@ mod tests {
|
|||||||
|
|
||||||
virtual_overseer.send(
|
virtual_overseer.send(
|
||||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||||
ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
|
ActiveLeavesUpdate::start_work(hash_a, Arc::new(jaeger::Span::Disabled)),
|
||||||
))
|
))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
@@ -1372,7 +1372,7 @@ mod tests {
|
|||||||
).await;
|
).await;
|
||||||
virtual_overseer.send(
|
virtual_overseer.send(
|
||||||
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||||
ActiveLeavesUpdate::start_work(hash_b, Arc::new(JaegerSpan::Disabled)),
|
ActiveLeavesUpdate::start_work(hash_b, Arc::new(jaeger::Span::Disabled)),
|
||||||
))
|
))
|
||||||
).await;
|
).await;
|
||||||
|
|
||||||
|
|||||||
@@ -869,7 +869,7 @@ mod tests {
|
|||||||
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
|
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
|
||||||
SessionIndex, SessionInfo,
|
SessionIndex, SessionInfo,
|
||||||
};
|
};
|
||||||
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan};
|
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
|
||||||
use polkadot_node_subsystem_util::TimeoutExt;
|
use polkadot_node_subsystem_util::TimeoutExt;
|
||||||
use polkadot_subsystem_testhelpers as test_helpers;
|
use polkadot_subsystem_testhelpers as test_helpers;
|
||||||
use polkadot_node_network_protocol::{view, our_view};
|
use polkadot_node_network_protocol::{view, our_view};
|
||||||
@@ -1125,7 +1125,7 @@ mod tests {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
virtual_overseer,
|
virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: [(test_state.relay_parent, Arc::new(JaegerSpan::Disabled))][..].into(),
|
activated: [(test_state.relay_parent, Arc::new(jaeger::Span::Disabled))][..].into(),
|
||||||
deactivated: [][..].into(),
|
deactivated: [][..].into(),
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ use polkadot_primitives::v1::{
|
|||||||
Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV,
|
Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV,
|
||||||
};
|
};
|
||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
jaeger, PerLeafSpan, JaegerSpan,
|
jaeger, PerLeafSpan,
|
||||||
FromOverseer, OverseerSignal, SubsystemContext,
|
FromOverseer, OverseerSignal, SubsystemContext,
|
||||||
messages::{
|
messages::{
|
||||||
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
|
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
|
||||||
@@ -598,7 +598,7 @@ async fn handle_our_view_change(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let old_view = std::mem::replace(&mut state.view, view);
|
let old_view = std::mem::replace(&mut state.view, view);
|
||||||
|
|
||||||
let added: HashMap<Hash, Arc<JaegerSpan>> = state.view
|
let added: HashMap<Hash, Arc<jaeger::Span>> = state.view
|
||||||
.span_per_head()
|
.span_per_head()
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|v| !old_view.contains(&v.0))
|
.filter(|v| !old_view.contains(&v.0))
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ use polkadot_primitives::v1::{
|
|||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, SubsystemResult, SubsystemError, Subsystem,
|
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, SubsystemResult, SubsystemError, Subsystem,
|
||||||
FromOverseer, SpawnedSubsystem,
|
FromOverseer, SpawnedSubsystem,
|
||||||
|
jaeger,
|
||||||
messages::{
|
messages::{
|
||||||
PoVDistributionMessage, AllMessages, NetworkBridgeMessage, NetworkBridgeEvent,
|
PoVDistributionMessage, AllMessages, NetworkBridgeMessage, NetworkBridgeEvent,
|
||||||
},
|
},
|
||||||
@@ -154,7 +155,11 @@ async fn handle_signal(
|
|||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
|
||||||
let _timer = state.metrics.time_handle_signal();
|
let _timer = state.metrics.time_handle_signal();
|
||||||
|
|
||||||
for (relay_parent, _span) in activated {
|
for (relay_parent, span) in activated {
|
||||||
|
let _span = span.child_builder("pov-dist")
|
||||||
|
.with_stage(jaeger::Stage::PoVDistribution)
|
||||||
|
.build();
|
||||||
|
|
||||||
match request_validators_ctx(relay_parent, ctx).await {
|
match request_validators_ctx(relay_parent, ctx).await {
|
||||||
Ok(vals_rx) => {
|
Ok(vals_rx) => {
|
||||||
let n_validators = match vals_rx.await? {
|
let n_validators = match vals_rx.await? {
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ use polkadot_primitives::v1::{
|
|||||||
AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId,
|
AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId,
|
||||||
ScheduledCore, ValidatorIndex, SessionIndex, SessionInfo,
|
ScheduledCore, ValidatorIndex, SessionIndex, SessionInfo,
|
||||||
};
|
};
|
||||||
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan};
|
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
|
||||||
use polkadot_node_subsystem_test_helpers as test_helpers;
|
use polkadot_node_subsystem_test_helpers as test_helpers;
|
||||||
use polkadot_node_subsystem_util::TimeoutExt;
|
use polkadot_node_subsystem_util::TimeoutExt;
|
||||||
use polkadot_node_network_protocol::{view, our_view};
|
use polkadot_node_network_protocol::{view, our_view};
|
||||||
@@ -277,7 +277,7 @@ fn ask_validators_for_povs() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: [(test_state.relay_parent, Arc::new(JaegerSpan::Disabled))][..].into(),
|
activated: [(test_state.relay_parent, Arc::new(jaeger::Span::Disabled))][..].into(),
|
||||||
deactivated: [][..].into(),
|
deactivated: [][..].into(),
|
||||||
}),
|
}),
|
||||||
).await;
|
).await;
|
||||||
@@ -449,7 +449,7 @@ fn ask_validators_for_povs() {
|
|||||||
overseer_signal(
|
overseer_signal(
|
||||||
&mut virtual_overseer,
|
&mut virtual_overseer,
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: [(next_leaf, Arc::new(JaegerSpan::Disabled))][..].into(),
|
activated: [(next_leaf, Arc::new(jaeger::Span::Disabled))][..].into(),
|
||||||
deactivated: [current.clone()][..].into(),
|
deactivated: [current.clone()][..].into(),
|
||||||
})
|
})
|
||||||
).await;
|
).await;
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ use std::{fmt, collections::HashMap};
|
|||||||
|
|
||||||
pub use sc_network::PeerId;
|
pub use sc_network::PeerId;
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use polkadot_node_jaeger::JaegerSpan;
|
pub use polkadot_node_jaeger as jaeger;
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub use std::sync::Arc;
|
pub use std::sync::Arc;
|
||||||
|
|
||||||
@@ -118,16 +118,16 @@ macro_rules! impl_try_from {
|
|||||||
|
|
||||||
/// Specialized wrapper around [`View`].
|
/// Specialized wrapper around [`View`].
|
||||||
///
|
///
|
||||||
/// Besides the access to the view itself, it also gives access to the [`JaegerSpan`] per leave/head.
|
/// Besides the access to the view itself, it also gives access to the [`jaeger::Span`] per leave/head.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct OurView {
|
pub struct OurView {
|
||||||
view: View,
|
view: View,
|
||||||
span_per_head: HashMap<Hash, Arc<JaegerSpan>>,
|
span_per_head: HashMap<Hash, Arc<jaeger::Span>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OurView {
|
impl OurView {
|
||||||
/// Creates a new instance.
|
/// Creates a new instance.
|
||||||
pub fn new(heads: impl IntoIterator<Item = (Hash, Arc<JaegerSpan>)>, finalized_number: BlockNumber) -> Self {
|
pub fn new(heads: impl IntoIterator<Item = (Hash, Arc<jaeger::Span>)>, finalized_number: BlockNumber) -> Self {
|
||||||
let state_per_head = heads.into_iter().collect::<HashMap<_, _>>();
|
let state_per_head = heads.into_iter().collect::<HashMap<_, _>>();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
@@ -142,7 +142,7 @@ impl OurView {
|
|||||||
/// Returns the span per head map.
|
/// Returns the span per head map.
|
||||||
///
|
///
|
||||||
/// For each head there exists one span in this map.
|
/// For each head there exists one span in this map.
|
||||||
pub fn span_per_head(&self) -> &HashMap<Hash, Arc<JaegerSpan>> {
|
pub fn span_per_head(&self) -> &HashMap<Hash, Arc<jaeger::Span>> {
|
||||||
&self.span_per_head
|
&self.span_per_head
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -161,7 +161,7 @@ impl std::ops::Deref for OurView {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Construct a new [`OurView`] with the given chain heads, finalized number 0 and disabled [`JaegerSpan`]'s.
|
/// Construct a new [`OurView`] with the given chain heads, finalized number 0 and disabled [`jaeger::Span`]'s.
|
||||||
///
|
///
|
||||||
/// NOTE: Use for tests only.
|
/// NOTE: Use for tests only.
|
||||||
///
|
///
|
||||||
@@ -176,7 +176,7 @@ impl std::ops::Deref for OurView {
|
|||||||
macro_rules! our_view {
|
macro_rules! our_view {
|
||||||
( $( $hash:expr ),* $(,)? ) => {
|
( $( $hash:expr ),* $(,)? ) => {
|
||||||
$crate::OurView::new(
|
$crate::OurView::new(
|
||||||
vec![ $( $hash.clone() ),* ].into_iter().map(|h| (h, $crate::Arc::new($crate::JaegerSpan::Disabled))),
|
vec![ $( $hash.clone() ),* ].into_iter().map(|h| (h, $crate::Arc::new($crate::jaeger::Span::Disabled))),
|
||||||
0,
|
0,
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -25,6 +25,7 @@
|
|||||||
use polkadot_subsystem::{
|
use polkadot_subsystem::{
|
||||||
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
|
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
|
||||||
ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
|
ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
|
||||||
|
jaeger,
|
||||||
messages::{
|
messages::{
|
||||||
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
|
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
|
||||||
RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent,
|
RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent,
|
||||||
@@ -516,10 +517,10 @@ async fn circulate_statement_and_dependents(
|
|||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
let _span = active_head.span.child_with_candidate(
|
let _span = active_head.span.child_builder("circulate-statement")
|
||||||
"circulate-statement",
|
.with_candidate(&statement.payload().candidate_hash())
|
||||||
&statement.payload().candidate_hash()
|
.with_stage(jaeger::Stage::StatementDistribution)
|
||||||
);
|
.build();
|
||||||
|
|
||||||
// First circulate the statement directly to all peers needing it.
|
// First circulate the statement directly to all peers needing it.
|
||||||
// The borrow of `active_head` needs to encompass only this (Rust) statement.
|
// The borrow of `active_head` needs to encompass only this (Rust) statement.
|
||||||
@@ -533,10 +534,14 @@ async fn circulate_statement_and_dependents(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let _span = _span.child("send-to-peers");
|
||||||
// Now send dependent statements to all peers needing them, if any.
|
// Now send dependent statements to all peers needing them, if any.
|
||||||
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
|
if let Some((candidate_hash, peers_needing_dependents)) = outputs {
|
||||||
for peer in peers_needing_dependents {
|
for peer in peers_needing_dependents {
|
||||||
if let Some(peer_data) = peers.get_mut(&peer) {
|
if let Some(peer_data) = peers.get_mut(&peer) {
|
||||||
|
let _span_loop = _span.child_builder("to-peer")
|
||||||
|
.with_peer_id(&peer)
|
||||||
|
.build();
|
||||||
// defensive: the peer data should always be some because the iterator
|
// defensive: the peer data should always be some because the iterator
|
||||||
// of peers is derived from the set of peers.
|
// of peers is derived from the set of peers.
|
||||||
send_statements_about(
|
send_statements_about(
|
||||||
@@ -1071,7 +1076,7 @@ mod tests {
|
|||||||
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
|
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
|
||||||
use sc_keystore::LocalKeystore;
|
use sc_keystore::LocalKeystore;
|
||||||
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
|
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
|
||||||
use polkadot_subsystem::JaegerSpan;
|
use polkadot_subsystem::jaeger;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn active_head_accepts_only_2_seconded_per_validator() {
|
fn active_head_accepts_only_2_seconded_per_validator() {
|
||||||
@@ -1112,7 +1117,7 @@ mod tests {
|
|||||||
let mut head_data = ActiveHeadData::new(
|
let mut head_data = ActiveHeadData::new(
|
||||||
validators,
|
validators,
|
||||||
session_index,
|
session_index,
|
||||||
PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
|
PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
|
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
|
||||||
@@ -1374,7 +1379,7 @@ mod tests {
|
|||||||
let mut data = ActiveHeadData::new(
|
let mut data = ActiveHeadData::new(
|
||||||
validators,
|
validators,
|
||||||
session_index,
|
session_index,
|
||||||
PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
|
PerLeafSpan::new(Arc::new(jaeger::Span::Disabled), "test"),
|
||||||
);
|
);
|
||||||
|
|
||||||
let noted = data.note_statement(block_on(SignedFullStatement::sign(
|
let noted = data.note_statement(block_on(SignedFullStatement::sign(
|
||||||
@@ -1627,7 +1632,7 @@ mod tests {
|
|||||||
let test_fut = async move {
|
let test_fut = async move {
|
||||||
// register our active heads.
|
// register our active heads.
|
||||||
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: vec![(hash_a, Arc::new(JaegerSpan::Disabled))].into(),
|
activated: vec![(hash_a, Arc::new(jaeger::Span::Disabled))].into(),
|
||||||
deactivated: vec![].into(),
|
deactivated: vec![].into(),
|
||||||
}))).await;
|
}))).await;
|
||||||
|
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ use polkadot_subsystem::messages::{
|
|||||||
};
|
};
|
||||||
pub use polkadot_subsystem::{
|
pub use polkadot_subsystem::{
|
||||||
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
|
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
|
||||||
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, JaegerSpan, jaeger,
|
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, jaeger,
|
||||||
};
|
};
|
||||||
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome};
|
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}, metered, Metronome};
|
||||||
use polkadot_node_primitives::SpawnNamed;
|
use polkadot_node_primitives::SpawnNamed;
|
||||||
@@ -580,8 +580,8 @@ pub struct Overseer<S> {
|
|||||||
/// External listeners waiting for a hash to be in the active-leave set.
|
/// External listeners waiting for a hash to be in the active-leave set.
|
||||||
activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
|
activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
|
||||||
|
|
||||||
/// Stores the [`JaegerSpan`] per active leaf.
|
/// Stores the [`jaeger::Span`] per active leaf.
|
||||||
span_per_active_leaf: HashMap<Hash, Arc<JaegerSpan>>,
|
span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,
|
||||||
|
|
||||||
/// A set of leaves that `Overseer` starts working with.
|
/// A set of leaves that `Overseer` starts working with.
|
||||||
///
|
///
|
||||||
@@ -1913,7 +1913,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
|
||||||
fn on_head_activated(&mut self, hash: &Hash, parent_hash: Option<Hash>) -> Arc<JaegerSpan> {
|
fn on_head_activated(&mut self, hash: &Hash, parent_hash: Option<Hash>) -> Arc<jaeger::Span> {
|
||||||
self.metrics.on_head_activated();
|
self.metrics.on_head_activated();
|
||||||
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
|
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
|
||||||
for listener in listeners {
|
for listener in listeners {
|
||||||
@@ -2026,7 +2026,7 @@ mod tests {
|
|||||||
use futures::{executor, pin_mut, select, FutureExt, pending};
|
use futures::{executor, pin_mut, select, FutureExt, pending};
|
||||||
|
|
||||||
use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash};
|
use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash};
|
||||||
use polkadot_subsystem::{messages::RuntimeApiRequest, messages::NetworkBridgeEvent, JaegerSpan};
|
use polkadot_subsystem::{messages::RuntimeApiRequest, messages::NetworkBridgeEvent, jaeger};
|
||||||
use polkadot_node_primitives::{CollationResult, CollationGenerationConfig};
|
use polkadot_node_primitives::{CollationResult, CollationGenerationConfig};
|
||||||
use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange};
|
use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange};
|
||||||
use polkadot_node_subsystem_util::metered;
|
use polkadot_node_subsystem_util::metered;
|
||||||
@@ -2393,14 +2393,14 @@ mod tests {
|
|||||||
let expected_heartbeats = vec![
|
let expected_heartbeats = vec![
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
|
||||||
first_block_hash,
|
first_block_hash,
|
||||||
Arc::new(JaegerSpan::Disabled),
|
Arc::new(jaeger::Span::Disabled),
|
||||||
)),
|
)),
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: [(second_block_hash, Arc::new(JaegerSpan::Disabled))].as_ref().into(),
|
activated: [(second_block_hash, Arc::new(jaeger::Span::Disabled))].as_ref().into(),
|
||||||
deactivated: [first_block_hash].as_ref().into(),
|
deactivated: [first_block_hash].as_ref().into(),
|
||||||
}),
|
}),
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: [(third_block_hash, Arc::new(JaegerSpan::Disabled))].as_ref().into(),
|
activated: [(third_block_hash, Arc::new(jaeger::Span::Disabled))].as_ref().into(),
|
||||||
deactivated: [second_block_hash].as_ref().into(),
|
deactivated: [second_block_hash].as_ref().into(),
|
||||||
}),
|
}),
|
||||||
];
|
];
|
||||||
@@ -2489,8 +2489,8 @@ mod tests {
|
|||||||
let expected_heartbeats = vec![
|
let expected_heartbeats = vec![
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: [
|
activated: [
|
||||||
(first_block_hash, Arc::new(JaegerSpan::Disabled)),
|
(first_block_hash, Arc::new(jaeger::Span::Disabled)),
|
||||||
(second_block_hash, Arc::new(JaegerSpan::Disabled)),
|
(second_block_hash, Arc::new(jaeger::Span::Disabled)),
|
||||||
].as_ref().into(),
|
].as_ref().into(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}),
|
}),
|
||||||
@@ -2577,7 +2577,7 @@ mod tests {
|
|||||||
let expected_heartbeats = vec![
|
let expected_heartbeats = vec![
|
||||||
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
|
||||||
activated: [
|
activated: [
|
||||||
(imported_block.hash, Arc::new(JaegerSpan::Disabled)),
|
(imported_block.hash, Arc::new(jaeger::Span::Disabled)),
|
||||||
].as_ref().into(),
|
].as_ref().into(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ use polkadot_node_subsystem::{
|
|||||||
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent},
|
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent},
|
||||||
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
|
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
|
||||||
};
|
};
|
||||||
use polkadot_node_jaeger::JaegerSpan;
|
use polkadot_node_jaeger as jaeger;
|
||||||
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
|
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
|
||||||
use futures_timer::Delay;
|
use futures_timer::Delay;
|
||||||
use parity_scale_codec::Encode;
|
use parity_scale_codec::Encode;
|
||||||
@@ -481,7 +481,7 @@ pub trait JobTrait: Unpin {
|
|||||||
/// The job should be ended when `receiver` returns `None`.
|
/// The job should be ended when `receiver` returns `None`.
|
||||||
fn run(
|
fn run(
|
||||||
parent: Hash,
|
parent: Hash,
|
||||||
span: Arc<JaegerSpan>,
|
span: Arc<jaeger::Span>,
|
||||||
run_args: Self::RunArgs,
|
run_args: Self::RunArgs,
|
||||||
metrics: Self::Metrics,
|
metrics: Self::Metrics,
|
||||||
receiver: mpsc::Receiver<Self::ToJob>,
|
receiver: mpsc::Receiver<Self::ToJob>,
|
||||||
@@ -552,7 +552,7 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
|
|||||||
fn spawn_job(
|
fn spawn_job(
|
||||||
&mut self,
|
&mut self,
|
||||||
parent_hash: Hash,
|
parent_hash: Hash,
|
||||||
span: Arc<JaegerSpan>,
|
span: Arc<jaeger::Span>,
|
||||||
run_args: Job::RunArgs,
|
run_args: Job::RunArgs,
|
||||||
metrics: Job::Metrics,
|
metrics: Job::Metrics,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
@@ -1045,9 +1045,10 @@ mod tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use executor::block_on;
|
use executor::block_on;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use polkadot_node_jaeger as jaeger;
|
||||||
use polkadot_node_subsystem::{
|
use polkadot_node_subsystem::{
|
||||||
messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
|
messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
|
||||||
SpawnedSubsystem, JaegerSpan,
|
SpawnedSubsystem,
|
||||||
};
|
};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
|
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
|
||||||
@@ -1089,7 +1090,7 @@ mod tests {
|
|||||||
// this function is in charge of creating and executing the job's main loop
|
// this function is in charge of creating and executing the job's main loop
|
||||||
fn run(
|
fn run(
|
||||||
_: Hash,
|
_: Hash,
|
||||||
_: Arc<JaegerSpan>,
|
_: Arc<jaeger::Span>,
|
||||||
run_args: Self::RunArgs,
|
run_args: Self::RunArgs,
|
||||||
_metrics: Self::Metrics,
|
_metrics: Self::Metrics,
|
||||||
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
receiver: mpsc::Receiver<CandidateSelectionMessage>,
|
||||||
@@ -1173,7 +1174,7 @@ mod tests {
|
|||||||
test_harness(true, |mut overseer_handle, err_rx| async move {
|
test_harness(true, |mut overseer_handle, err_rx| async move {
|
||||||
overseer_handle
|
overseer_handle
|
||||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||||
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(JaegerSpan::Disabled)),
|
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(jaeger::Span::Disabled)),
|
||||||
)))
|
)))
|
||||||
.await;
|
.await;
|
||||||
assert_matches!(
|
assert_matches!(
|
||||||
@@ -1202,7 +1203,7 @@ mod tests {
|
|||||||
test_harness(true, |mut overseer_handle, err_rx| async move {
|
test_harness(true, |mut overseer_handle, err_rx| async move {
|
||||||
overseer_handle
|
overseer_handle
|
||||||
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
|
||||||
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(JaegerSpan::Disabled)),
|
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(jaeger::Span::Disabled)),
|
||||||
)))
|
)))
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -51,18 +51,18 @@ const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;
|
|||||||
/// Note that the activated and deactivated fields indicate deltas, not complete sets.
|
/// Note that the activated and deactivated fields indicate deltas, not complete sets.
|
||||||
#[derive(Clone, Default)]
|
#[derive(Clone, Default)]
|
||||||
pub struct ActiveLeavesUpdate {
|
pub struct ActiveLeavesUpdate {
|
||||||
/// New relay chain block hashes of interest and their associated [`JaegerSpan`].
|
/// New relay chain block hashes of interest and their associated [`jaeger::Span`].
|
||||||
///
|
///
|
||||||
/// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped
|
/// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped
|
||||||
/// when the leaf is deactivated.
|
/// when the leaf is deactivated.
|
||||||
pub activated: SmallVec<[(Hash, Arc<JaegerSpan>); ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
|
pub activated: SmallVec<[(Hash, Arc<jaeger::Span>); ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
|
||||||
/// Relay chain block hashes no longer of interest.
|
/// Relay chain block hashes no longer of interest.
|
||||||
pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
|
pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ActiveLeavesUpdate {
|
impl ActiveLeavesUpdate {
|
||||||
/// Create a ActiveLeavesUpdate with a single activated hash
|
/// Create a ActiveLeavesUpdate with a single activated hash
|
||||||
pub fn start_work(hash: Hash, span: Arc<JaegerSpan>) -> Self {
|
pub fn start_work(hash: Hash, span: Arc<jaeger::Span>) -> Self {
|
||||||
Self { activated: [(hash, span)][..].into(), ..Default::default() }
|
Self { activated: [(hash, span)][..].into(), ..Default::default() }
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,7 +90,7 @@ impl PartialEq for ActiveLeavesUpdate {
|
|||||||
|
|
||||||
impl fmt::Debug for ActiveLeavesUpdate {
|
impl fmt::Debug for ActiveLeavesUpdate {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
struct Activated<'a>(&'a [(Hash, Arc<JaegerSpan>)]);
|
struct Activated<'a>(&'a [(Hash, Arc<jaeger::Span>)]);
|
||||||
impl fmt::Debug for Activated<'_> {
|
impl fmt::Debug for Activated<'_> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
f.debug_list().entries(self.0.iter().map(|e| e.0)).finish()
|
f.debug_list().entries(self.0.iter().map(|e| e.0)).finish()
|
||||||
|
|||||||
Reference in New Issue
Block a user