diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 2cebe26d7d..c7b82b3c55 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4969,7 +4969,6 @@ dependencies = [ "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", - "smallvec 1.5.1", "sp-core", "sp-keyring", "thiserror", @@ -5057,7 +5056,6 @@ dependencies = [ "polkadot-overseer", "polkadot-primitives", "sc-service", - "smallvec 1.5.1", "sp-core", "thiserror", "tracing", @@ -5208,11 +5206,27 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "polkadot-node-jaeger" +version = "0.1.0" +dependencies = [ + "async-std", + "lazy_static", + "log", + "mick-jaeger", + "parking_lot 0.11.1", + "polkadot-primitives", + "sc-network", + "sp-core", + "thiserror", +] + [[package]] name = "polkadot-node-network-protocol" version = "0.1.0" dependencies = [ "parity-scale-codec", + "polkadot-node-jaeger", "polkadot-node-primitives", "polkadot-primitives", "sc-network", @@ -5247,6 +5261,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.11.1", "pin-project 1.0.2", + "polkadot-node-jaeger", "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem-test-helpers", @@ -5297,6 +5312,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.11.1", "pin-project 1.0.2", + "polkadot-node-jaeger", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", @@ -5368,7 +5384,6 @@ dependencies = [ "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", - "smallvec 1.5.1", "sp-core", "sp-keyring", "thiserror", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index 0c6d23be7d..a3d026e729 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -65,6 +65,7 @@ members = [ "node/subsystem", "node/subsystem-test-helpers", "node/subsystem-util", + "node/jaeger", "node/test/client", "node/test/service", "parachain/test-parachains", diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 7796a967c1..79c08eed27 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -126,12 +126,17 @@ impl CollationGenerationSubsystem { // follow the procedure from the guide if let Some(config) = &self.config { let metrics = self.metrics.clone(); - if let Err(err) = - handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await - { + if let Err(err) = handle_new_activations( + config.clone(), + activated.into_iter().map(|v| v.0), + ctx, + metrics, + sender, + ).await { tracing::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activations"); - }; + } } + false } Ok(Signal(Conclude)) => true, @@ -164,10 +169,10 @@ where Context: SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { - let future = Box::pin(async move { + let future = async move { self.run(ctx).await; Ok(()) - }); + }.boxed(); SpawnedSubsystem { name: "collation-generation-subsystem", @@ -176,10 +181,10 @@ where } } -#[tracing::instrument(level = "trace", skip(ctx, metrics, sender), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, metrics, sender, activated), fields(subsystem = LOG_TARGET))] async fn handle_new_activations( config: Arc, - activated: &[Hash], + activated: impl IntoIterator, ctx: &mut Context, metrics: Metrics, sender: &mpsc::Sender, @@ -189,11 +194,9 @@ async fn handle_new_activations( let _overall_timer = metrics.time_new_activations(); - for relay_parent in activated.iter().copied() { + for relay_parent in activated { let _relay_parent_timer = metrics.time_new_activations_relay_parent(); - // double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and - // returns a receiver. The second layer of requests actually polls those receivers to completion. let (availability_cores, validators) = join!( request_availability_cores_ctx(relay_parent, ctx).await?, request_validators_ctx(relay_parent, ctx).await?, @@ -544,7 +547,7 @@ mod tests { subsystem_test_harness(overseer, |mut ctx| async move { handle_new_activations( test_config(123u32), - &subsystem_activated_hashes, + subsystem_activated_hashes, &mut ctx, Metrics(None), &tx, @@ -623,7 +626,7 @@ mod tests { let (tx, _rx) = mpsc::channel(0); subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx) + handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx) .await .unwrap(); }); @@ -700,7 +703,7 @@ mod tests { let sent_messages = Arc::new(Mutex::new(Vec::new())); let subsystem_sent_messages = sent_messages.clone(); subsystem_test_harness(overseer, |mut ctx| async move { - handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx) + handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx) .await .unwrap(); diff --git a/polkadot/node/core/av-store/Cargo.toml b/polkadot/node/core/av-store/Cargo.toml index ef4daaaf9d..607b851d23 100644 --- a/polkadot/node/core/av-store/Cargo.toml +++ b/polkadot/node/core/av-store/Cargo.toml @@ -26,7 +26,6 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master log = "0.4.11" env_logger = "0.8.2" assert_matches = "1.4.0" -smallvec = "1.5.1" kvdb-memorydb = "0.7.0" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 66bec12142..77f5dfb0c5 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -534,7 +534,7 @@ where FromOverseer::Signal(OverseerSignal::ActiveLeaves( ActiveLeavesUpdate { activated, .. }) ) => { - for activated in activated.into_iter() { + for (activated, _span) in activated.into_iter() { process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?; } } diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 330e0c529a..1d3bb26d4e 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -23,7 +23,6 @@ use futures::{ executor, Future, }; -use smallvec::smallvec; use polkadot_primitives::v1::{ AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData, @@ -31,7 +30,7 @@ use polkadot_primitives::v1::{ }; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem::{ - ActiveLeavesUpdate, errors::RuntimeApiError, + ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan, }; use polkadot_node_subsystem_test_helpers as test_helpers; @@ -182,8 +181,8 @@ fn runtime_api_error_does_not_stop_the_subsystem() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![new_leaf.clone()], - deactivated: smallvec![], + activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(), + deactivated: vec![].into(), }), ).await; @@ -516,8 +515,8 @@ fn stored_data_kept_until_finalized() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![new_leaf.clone()], - deactivated: smallvec![], + activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(), + deactivated: vec![].into(), }), ).await; @@ -620,8 +619,8 @@ fn stored_chunk_kept_until_finalized() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![new_leaf.clone()], - deactivated: smallvec![], + activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(), + deactivated: vec![].into(), }), ).await; @@ -758,8 +757,8 @@ fn forkfullness_works() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![new_leaf_1.clone(), new_leaf_2.clone()], - deactivated: smallvec![], + activated: vec![(new_leaf_1, Arc::new(JaegerSpan::Disabled)), (new_leaf_2, Arc::new(JaegerSpan::Disabled))].into(), + deactivated: vec![].into(), }), ).await; diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 8ac7df309f..d4973e48b3 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -37,7 +37,7 @@ use polkadot_node_primitives::{ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult, }; use polkadot_subsystem::{ - jaeger::{self, JaegerSpan}, + JaegerSpan, PerLeafSpan, messages::{ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, PoVDistributionMessage, ProvisionableData, @@ -923,9 +923,10 @@ impl util::JobTrait for CandidateBackingJob { const NAME: &'static str = "CandidateBackingJob"; - #[tracing::instrument(skip(keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(skip(span, keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))] fn run( parent: Hash, + span: Arc, keystore: SyncCryptoStorePtr, metrics: Metrics, rx_to: mpsc::Receiver, @@ -952,7 +953,7 @@ impl util::JobTrait for CandidateBackingJob { } } - let span = jaeger::hash_span(&parent, "run:backing"); + let span = PerLeafSpan::new(span, "backing"); let _span = span.child("runtime-apis"); let (validators, groups, session_index, cores) = futures::try_join!( @@ -1340,7 +1341,10 @@ mod tests { ) { // Start work on some new parent. virtual_overseer.send(FromOverseer::Signal( - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(test_state.relay_parent))) + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work( + test_state.relay_parent, + Arc::new(JaegerSpan::Disabled), + ))) ).await; // Check that subsystem job issues a request for a validator set. diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index ce82695997..8c06f76424 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -23,7 +23,7 @@ use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future}; use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr}; use polkadot_node_subsystem::{ - jaeger, + jaeger, PerLeafSpan, JaegerSpan, messages::{ AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest, @@ -34,7 +34,7 @@ use polkadot_node_subsystem_util::{ self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; -use std::{pin::Pin, time::Duration, iter::FromIterator}; +use std::{pin::Pin, time::Duration, iter::FromIterator, sync::Arc}; use wasm_timer::{Delay, Instant}; /// Delay between starting a bitfield signing job and its attempting to create a bitfield. @@ -215,9 +215,10 @@ impl JobTrait for BitfieldSigningJob { const NAME: &'static str = "BitfieldSigningJob"; /// Run a job for the parent block indicated - #[tracing::instrument(skip(keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))] fn run( relay_parent: Hash, + span: Arc, keystore: Self::RunArgs, metrics: Self::Metrics, _receiver: mpsc::Receiver, @@ -225,7 +226,7 @@ impl JobTrait for BitfieldSigningJob { ) -> Pin> + Send>> { let metrics = metrics.clone(); async move { - let span = jaeger::hash_span(&relay_parent, "run:bitfield-signing"); + let span = PerLeafSpan::new(span, "bitfield-signing"); let _span = span.child("delay"); let wait_until = Instant::now() + JOB_DELAY; diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index de128722af..51eaa80a47 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -25,7 +25,7 @@ use futures::{ }; use sp_keystore::SyncCryptoStorePtr; use polkadot_node_subsystem::{ - jaeger, + jaeger, JaegerSpan, PerLeafSpan, errors::ChainApiError, messages::{ AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage, @@ -39,7 +39,7 @@ use polkadot_node_subsystem_util::{ use polkadot_primitives::v1::{ CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV, }; -use std::pin::Pin; +use std::{pin::Pin, sync::Arc}; use thiserror::Error; const LOG_TARGET: &'static str = "candidate_selection"; @@ -95,12 +95,13 @@ impl JobTrait for CandidateSelectionJob { #[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] fn run( relay_parent: Hash, + span: Arc, keystore: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, mut sender: mpsc::Sender, ) -> Pin> + Send>> { - let span = jaeger::hash_span(&relay_parent, "candidate-selection:run"); + let span = PerLeafSpan::new(span, "candidate-selection"); async move { let _span = span.child("query-runtime"); let (groups, cores) = futures::try_join!( diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 7eb134fc2a..4cb3475fdd 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -25,8 +25,7 @@ use futures::{ prelude::*, }; use polkadot_node_subsystem::{ - errors::{ChainApiError, RuntimeApiError}, - jaeger, + errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, JaegerSpan, messages::{ AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, @@ -40,7 +39,7 @@ use polkadot_primitives::v1::{ BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption, SignedAvailabilityBitfield, ValidatorIndex, }; -use std::{pin::Pin, collections::BTreeMap}; +use std::{pin::Pin, collections::BTreeMap, sync::Arc}; use thiserror::Error; use futures_timer::Delay; @@ -140,9 +139,10 @@ impl JobTrait for ProvisioningJob { /// Run a job for the parent block indicated // // this function is in charge of creating and executing the job's main loop - #[tracing::instrument(skip(_run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] + #[tracing::instrument(skip(span, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))] fn run( relay_parent: Hash, + span: Arc, _run_args: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, @@ -156,11 +156,7 @@ impl JobTrait for ProvisioningJob { receiver, ); - let span = jaeger::hash_span(&relay_parent, "provisioner"); - - // it isn't necessary to break run_loop into its own function, - // but it's convenient to separate the concerns in this way - job.run_loop(&span).await + job.run_loop(PerLeafSpan::new(span, "provisioner")).await } .boxed() } @@ -186,7 +182,7 @@ impl ProvisioningJob { } } - async fn run_loop(mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> { + async fn run_loop(mut self, span: PerLeafSpan) -> Result<(), Error> { use ProvisionerMessage::{ ProvisionableData, RequestBlockAuthorshipData, RequestInherentData, }; diff --git a/polkadot/node/jaeger/Cargo.toml b/polkadot/node/jaeger/Cargo.toml new file mode 100644 index 0000000000..df2528ac97 --- /dev/null +++ b/polkadot/node/jaeger/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "polkadot-node-jaeger" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" +description = "Polkadot Jaeger primitives" + +[dependencies] +async-std = "1.8.0" +mick-jaeger = "0.1.2" +lazy_static = "1.4" +parking_lot = "0.11.1" +polkadot-primitives = { path = "../../primitives" } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +thiserror = "1.0.23" +log = "0.4.11" diff --git a/polkadot/node/subsystem/src/jaeger.rs b/polkadot/node/jaeger/src/lib.rs similarity index 82% rename from polkadot/node/subsystem/src/jaeger.rs rename to polkadot/node/jaeger/src/lib.rs index 9e2bb577f2..c99b498ad0 100644 --- a/polkadot/node/subsystem/src/jaeger.rs +++ b/polkadot/node/jaeger/src/lib.rs @@ -14,7 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Jaeger integration. +//! Polkadot Jaeger related primitives +//! +//! Provides primitives used by Polkadot for interfacing with Jaeger. +//! +//! # Integration //! //! See for an introduction. //! @@ -39,15 +43,22 @@ //! -p 9411:9411 \ //! docker.io/jaegertracing/all-in-one:1.21 //! ``` -//! -use polkadot_node_primitives::SpawnNamed; +use sp_core::traits::SpawnNamed; use polkadot_primitives::v1::{Hash, PoV, CandidateHash}; use parking_lot::RwLock; -use std::sync::Arc; -use std::result; -pub use crate::errors::JaegerError; +use std::{sync::Arc, result}; +/// A description of an error causing the chain API request to be unservable. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum JaegerError { + #[error("Already launched the collector thread")] + AlreadyLaunched, + + #[error("Missing jaeger configuration")] + MissingConfiguration, +} lazy_static::lazy_static! { static ref INSTANCE: RwLock = RwLock::new(Jaeger::None); @@ -102,6 +113,50 @@ impl JaegerConfigBuilder { } } +/// A special "per leaf span". +/// +/// Essentially this span wraps two spans: +/// +/// 1. The span that is created per leaf in the overseer. +/// 2. Some child span of the per-leaf span. +/// +/// This just works as auxiliary structure to easily store both. +#[derive(Debug)] +pub struct PerLeafSpan { + leaf_span: Arc, + span: JaegerSpan, +} + +impl PerLeafSpan { + /// Creates a new instance. + /// + /// Takes the `leaf_span` that is created by the overseer per leaf and a name for a child span. + /// Both will be stored in this object, while the child span is implicitly accessible by using the + /// [`Deref`](std::ops::Deref) implementation. + pub fn new(leaf_span: Arc, name: impl Into) -> Self { + let span = leaf_span.child(name); + + Self { + span, + leaf_span, + } + } + + /// Returns the leaf span. + pub fn leaf_span(&self) -> &Arc { + &self.leaf_span + } +} + +/// Returns a reference to the child span. +impl std::ops::Deref for PerLeafSpan { + type Target = JaegerSpan; + + fn deref(&self) -> &JaegerSpan { + &self.span + } +} + /// A wrapper type for a span. /// /// Handles running with and without jaeger. @@ -120,6 +175,7 @@ impl JaegerSpan { Self::Disabled => Self::Disabled, } } + /// Add an additional tag to the span. pub fn add_string_tag(&mut self, tag: &str, value: &str) { match self { diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index bf67965ca0..f07e48caab 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -32,7 +32,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; use polkadot_erasure_coding::branch_hash; use polkadot_node_network_protocol::{ - v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View, + v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View, OurView, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_primitives::v1::{ @@ -45,10 +45,8 @@ use polkadot_subsystem::messages::{ NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, }; use polkadot_subsystem::{ - jaeger, - errors::{ChainApiError, RuntimeApiError}, - ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, - SubsystemContext, SubsystemError, + jaeger, errors::{ChainApiError, RuntimeApiError}, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, }; use std::collections::{HashMap, HashSet}; use std::collections::hash_map::Entry; @@ -128,7 +126,7 @@ struct ProtocolState { peer_views: HashMap, /// Our own view. - view: View, + view: OurView, /// Caches a mapping of relay parents or ancestor to live candidate hashes. /// Allows fast intersection of live candidates with views and consecutive unioning. @@ -278,8 +276,8 @@ impl ProtocolState { } } - // Removes all entries from live_under which aren't referenced in the ancestry of - // one of our live relay-chain heads. + /// Removes all entries from live_under which aren't referenced in the ancestry of + /// one of our live relay-chain heads. fn clean_up_live_under_cache(&mut self) { let extended_view: HashSet<_> = self.per_relay_parent.iter() .map(|(r_hash, v)| v.ancestors.iter().cloned().chain(iter::once(*r_hash))) @@ -353,7 +351,7 @@ async fn handle_our_view_change( ctx: &mut Context, keystore: &SyncCryptoStorePtr, state: &mut ProtocolState, - view: View, + view: OurView, metrics: &Metrics, ) -> Result<()> where @@ -845,11 +843,11 @@ where } } -// Metadata about a candidate that is part of the live_candidates set. -// -// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This -// information is propagated to the higher level where it can be used to create data entries. Cached candidates -// already have entries associated with them, and thus don't need this metadata to be fetched. +/// Metadata about a candidate that is part of the live_candidates set. +/// +/// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This +/// information is propagated to the higher level where it can be used to create data entries. Cached candidates +/// already have entries associated with them, and thus don't need this metadata to be fetched. #[derive(Debug)] enum FetchedLiveCandidate { Cached, diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index 2479c8ec6c..76a8566246 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -17,7 +17,7 @@ use super::*; use assert_matches::assert_matches; use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; -use polkadot_node_network_protocol::{view, ObservedRole}; +use polkadot_node_network_protocol::{view, ObservedRole, our_view}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_primitives::v1::{ AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex, @@ -398,7 +398,7 @@ async fn expect_chunks_network_message( async fn change_our_view( virtual_overseer: &mut test_helpers::TestSubsystemContextHandle, - view: View, + view: OurView, validator_public: &[ValidatorId], ancestors: Vec, session_per_relay_parent: HashMap, @@ -574,7 +574,7 @@ fn check_views() { let genesis = Hash::repeat_byte(0xAA); change_our_view( &mut virtual_overseer, - view![current], + our_view![current], &validator_public, vec![ancestors[0], genesis], hashmap! { current => 1, genesis => 1 }, @@ -641,7 +641,7 @@ fn check_views() { peer_b_2 => view![ancestors[0]], }, ); - assert_eq!(view, view![current]); + assert_eq!(view, our_view![current]); } }; } @@ -676,7 +676,7 @@ fn reputation_verification() { change_our_view( &mut virtual_overseer, - view![current], + our_view![current], &validator_public, vec![ancestors[0]], hashmap! { current => 1 }, @@ -768,7 +768,7 @@ fn not_a_live_candidate_is_detected() { change_our_view( &mut virtual_overseer, - view![current], + our_view![current], &validator_public, vec![ancestors[0]], hashmap! { current => 1 }, @@ -816,7 +816,7 @@ fn peer_change_view_before_us() { change_our_view( &mut virtual_overseer, - view![current], + our_view![current], &validator_public, vec![ancestors[0]], hashmap! { current => 1 }, @@ -863,7 +863,7 @@ fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() { change_our_view( &mut virtual_overseer, - view![ancestors[0]], + our_view![ancestors[0]], &validator_public, vec![ancestors[1]], hashmap! { ancestors[0] => 1 }, @@ -879,7 +879,7 @@ fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() { change_our_view( &mut virtual_overseer, - view![current], + our_view![current], &validator_public, vec![ancestors[0]], hashmap! { current => 1 }, @@ -1218,7 +1218,7 @@ fn new_peer_gets_all_chunks_send() { change_our_view( &mut virtual_overseer, - view![current], + our_view![current], &validator_public, vec![ancestors[0]], hashmap! { current => 1 }, diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 39cacf1041..a771baf3ec 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -27,12 +27,12 @@ use futures::{channel::oneshot, FutureExt}; use polkadot_subsystem::messages::*; use polkadot_subsystem::{ - jaeger, - ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, + PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, + SubsystemResult, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; -use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange}; +use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange, OurView}; use std::collections::{HashMap, HashSet}; const COST_SIGNATURE_INVALID: ReputationChange = @@ -79,21 +79,21 @@ impl BitfieldGossipMessage { /// Data used to track information of peers and relay parents the /// overseer ordered us to work on. -#[derive(Default, Clone, Debug)] +#[derive(Default, Debug)] struct ProtocolState { /// track all active peers and their views /// to determine what is relevant to them. peer_views: HashMap, /// Our current view. - view: View, + view: OurView, /// Additional data particular to a relay parent. per_relay_parent: HashMap, } /// Data for a particular relay parent. -#[derive(Debug, Clone, Default)] +#[derive(Debug)] struct PerRelayParentData { /// Signing context for a particular relay parent. signing_context: SigningContext, @@ -113,9 +113,24 @@ struct PerRelayParentData { /// Track messages that were already received by a peer /// to prevent flooding. message_received_from_peer: HashMap>, + + /// The span for this leaf/relay parent. + span: PerLeafSpan, } impl PerRelayParentData { + /// Create a new instance. + fn new(signing_context: SigningContext, validator_set: Vec, span: PerLeafSpan) -> Self { + Self { + signing_context, + validator_set, + span, + one_per_validator: Default::default(), + message_sent_to_peer: Default::default(), + message_received_from_peer: Default::default(), + } + } + /// Determines if that particular message signed by a validator is needed by the given peer. fn message_from_validator_needed_by_peer( &self, @@ -176,12 +191,13 @@ impl BitfieldDistribution { // a network message was received handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await; } - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => { + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { let _timer = self.metrics.time_active_leaves_update(); - for relay_parent in activated { + for (relay_parent, span) in activated { tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated"); - let _span = jaeger::hash_span(&relay_parent, "bitfield-dist:active_leaves:basics"); + let span = PerLeafSpan::new(span, "bitfield-distribution"); + let _span = span.child("query-basics"); // query validator set and signing context per relay_parent once only match query_basics(&mut ctx, relay_parent).await { @@ -193,11 +209,7 @@ impl BitfieldDistribution { // us anything to do with this relay-parent anyway. let _ = state.per_relay_parent.insert( relay_parent, - PerRelayParentData { - signing_context, - validator_set, - ..Default::default() - }, + PerRelayParentData::new(signing_context, validator_set, span), ); } Err(e) => { @@ -206,11 +218,6 @@ impl BitfieldDistribution { _ => {}, } } - - for relay_parent in deactivated { - tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "deactivated"); - // defer the cleanup to the view change - } } FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => { tracing::trace!(target: LOG_TARGET, hash = %hash, number = %number, "block finalized"); @@ -310,7 +317,7 @@ async fn relay_message( where Context: SubsystemContext, { - let span = jaeger::hash_span(&message.relay_parent, "relay-msg"); + let span = job_data.span.child("relay-msg"); let _span = span.child("provisionable"); // notify the overseer about a new and valid signed bitfield @@ -398,6 +405,16 @@ where return; }; + let mut _span = { + let mut span = job_data.span.child("msg-received"); + span.add_string_tag("peer-id", &origin.to_base58()); + span.add_string_tag( + "claimed-validator", + &message.signed_availability.validator_index().to_string(), + ); + span + }; + let validator_set = &job_data.validator_set; if validator_set.is_empty() { tracing::trace!( @@ -495,16 +512,6 @@ where NetworkBridgeEvent::PeerMessage(remote, message) => { match message { protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => { - let mut _span = { - let mut span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received"); - span.add_string_tag("peer-id", &remote.to_base58()); - span.add_string_tag( - "claimed-validator", - &format!("{}", bitfield.validator_index()), - ); - span - }; - tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer"); let gossiped_bitfield = BitfieldGossipMessage { relay_parent, @@ -519,7 +526,7 @@ where /// Handle the changes necassary when our view changes. #[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))] -fn handle_our_view_change(state: &mut ProtocolState, view: View) { +fn handle_our_view_change(state: &mut ProtocolState, view: OurView) { let old_view = std::mem::replace(&mut (state.view), view); for added in state.view.difference(&old_view) { @@ -603,7 +610,7 @@ where return; }; - let _span = jaeger::hash_span(&message.relay_parent, "gossip"); + let _span = job_data.span.child("gossip"); job_data.message_sent_to_peer .entry(dest.clone()) @@ -778,7 +785,8 @@ mod test { use std::sync::Arc; use std::time::Duration; use assert_matches::assert_matches; - use polkadot_node_network_protocol::{view, ObservedRole}; + use polkadot_node_network_protocol::{view, ObservedRole, our_view}; + use polkadot_subsystem::JaegerSpan; macro_rules! launch { ($fut:expr) => { @@ -810,18 +818,19 @@ mod test { }, message_received_from_peer: hashmap!{}, message_sent_to_peer: hashmap!{}, + span: PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"), }, }, peer_views: peers .into_iter() .map(|peer| (peer, view!(relay_parent))) .collect(), - view: view!(relay_parent), + view: our_view!(relay_parent), } } fn state_with_view( - view: View, + view: OurView, relay_parent: Hash, ) -> (ProtocolState, SigningContext, SyncCryptoStorePtr, ValidatorId) { let mut state = ProtocolState::default(); @@ -843,6 +852,7 @@ mod test { one_per_validator: hashmap!{}, message_received_from_peer: hashmap!{}, message_sent_to_peer: hashmap!{}, + span: PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"), }) }).collect(); @@ -937,7 +947,7 @@ mod test { assert_ne!(peer_a, peer_b); // validator 0 key pair - let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone()); + let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash_a, hash_b], hash_a.clone()); state.peer_views.insert(peer_b.clone(), view![hash_a]); @@ -995,7 +1005,7 @@ mod test { assert_ne!(peer_a, peer_b); // validator 0 key pair - let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone()); + let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash_a, hash_b], hash_a.clone()); // create a signed message by validator 0 let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]); @@ -1110,7 +1120,7 @@ mod test { assert_ne!(peer_a, peer_b); // validator 0 key pair - let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash.clone()); + let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash.clone()); // create a signed message by validator 0 let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]); @@ -1206,7 +1216,7 @@ mod test { assert_ne!(peer_a, peer_b); // validator 0 key pair - let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone()); + let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash_a, hash_b], hash_a.clone()); // create a signed message by validator 0 let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]); @@ -1323,7 +1333,7 @@ mod test { )); // we are not interested in any peers at all anymore - state.view = view![]; + state.view = our_view![]; // on rx of the same message, since we are not interested, // should give penalty @@ -1365,7 +1375,7 @@ mod test { assert_ne!(peer_a, peer_b); // validator 0 key pair - let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash); + let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash); // create a signed message by validator 0 let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]); diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index e294b3dc43..c84d14d9fc 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -30,7 +30,7 @@ use sc_network::Event as NetworkEvent; use polkadot_subsystem::{ ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, - SubsystemResult, + SubsystemResult, JaegerSpan, }; use polkadot_subsystem::messages::{ NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage, @@ -39,7 +39,7 @@ use polkadot_subsystem::messages::{ }; use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash, BlockNumber}; use polkadot_node_network_protocol::{ - ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1 + ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView, }; use std::collections::{HashMap, hash_map}; @@ -47,7 +47,6 @@ use std::iter::ExactSizeIterator; use std::pin::Pin; use std::sync::Arc; - mod validator_discovery; /// The maximum amount of heads a peer is allowed to have in their view at any time. @@ -349,9 +348,9 @@ fn action_from_network_message(event: Option) -> Action { } } -fn construct_view(live_heads: &[Hash], finalized_number: BlockNumber) -> View { +fn construct_view(live_heads: impl DoubleEndedIterator, finalized_number: BlockNumber) -> View { View { - heads: live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect(), + heads: live_heads.rev().take(MAX_VIEW_HEADS).collect(), finalized_number } } @@ -360,13 +359,13 @@ fn construct_view(live_heads: &[Hash], finalized_number: BlockNumber) -> View { async fn update_our_view( net: &mut impl Network, ctx: &mut impl SubsystemContext, - live_heads: &[Hash], + live_heads: &[(Hash, Arc)], local_view: &mut View, finalized_number: BlockNumber, validation_peers: &HashMap, collation_peers: &HashMap, ) -> SubsystemResult<()> { - let new_view = construct_view(live_heads, finalized_number); + let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number); if *local_view == new_view { return Ok(()) } *local_view = new_view.clone(); @@ -380,18 +379,14 @@ async fn update_our_view( send_collation_message( net, collation_peers.keys().cloned(), - WireMessage::ViewUpdate(new_view.clone()), + WireMessage::ViewUpdate(new_view), ).await?; - dispatch_validation_event_to_all( - NetworkBridgeEvent::OurViewChange(new_view.clone()), - ctx, - ).await; + let our_view = OurView::new(live_heads.iter().cloned(), finalized_number); - dispatch_collation_event_to_all( - NetworkBridgeEvent::OurViewChange(new_view.clone()), - ctx, - ).await; + dispatch_validation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx).await; + + dispatch_collation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view), ctx).await; Ok(()) } @@ -584,7 +579,7 @@ where let mut event_stream = network_service.event_stream().fuse(); // Most recent heads are at the back. - let mut live_heads: Vec = Vec::with_capacity(MAX_VIEW_HEADS); + let mut live_heads: Vec<(Hash, Arc)> = Vec::with_capacity(MAX_VIEW_HEADS); let mut local_view = View::default(); let mut finalized_number = 0; @@ -642,7 +637,7 @@ where Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { live_heads.extend(activated); - live_heads.retain(|h| !deactivated.contains(h)); + live_heads.retain(|h| !deactivated.contains(&h.0)); update_our_view( &mut network_service, @@ -999,7 +994,9 @@ mod tests { let hash_a = Hash::repeat_byte(1); virtual_overseer.send( - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a))) + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)), + )) ).await; let actions = network_handle.next_network_actions(2).await; @@ -1187,7 +1184,9 @@ mod tests { let hash_a = Hash::repeat_byte(1); virtual_overseer.send( - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a))) + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)), + )) ).await; let actions = network_handle.next_network_actions(1).await; @@ -1378,7 +1377,9 @@ mod tests { FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_a, 1)) ).await; virtual_overseer.send( - FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_b))) + FromOverseer::Signal(OverseerSignal::ActiveLeaves( + ActiveLeavesUpdate::start_work(hash_b, Arc::new(JaegerSpan::Disabled)), + )) ).await; let actions = network_handle.next_network_actions(1).await; diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index 6160c778f2..618d77cba7 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -20,7 +20,6 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys log = "0.4.11" env_logger = "0.8.2" assert_matches = "1.4.0" -smallvec = "1.5.1" futures-timer = "3.0.2" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index ca6b1bfae0..ef061062ae 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -24,16 +24,11 @@ use polkadot_primitives::v1::{ CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, }; use polkadot_subsystem::{ - jaeger, + jaeger, PerLeafSpan, FromOverseer, OverseerSignal, SubsystemContext, - messages::{ - AllMessages, CollatorProtocolMessage, - NetworkBridgeMessage, - }, -}; -use polkadot_node_network_protocol::{ - v1 as protocol_v1, View, PeerId, NetworkBridgeEvent, RequestId, + messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage}, }; +use polkadot_node_network_protocol::{v1 as protocol_v1, View, PeerId, NetworkBridgeEvent, RequestId, OurView}; use polkadot_node_subsystem_util::{ validator_discovery, request_validators_ctx, @@ -188,7 +183,10 @@ struct State { peer_views: HashMap, /// Our own view. - view: View, + view: OurView, + + /// Span per relay parent. + span_per_relay_parent: HashMap, /// Possessed collations. /// @@ -431,7 +429,8 @@ async fn process_msg( state.collating_on = Some(id); } DistributeCollation(receipt, pov) => { - let _span1 = jaeger::hash_span(&receipt.descriptor.relay_parent, "distributing-collation"); + let _span1 = state.span_per_relay_parent + .get(&receipt.descriptor.relay_parent).map(|s| s.child("distributing-collation")); let _span2 = jaeger::pov_span(&pov, "distributing-collation"); match state.collating_on { Some(id) if receipt.descriptor.para_id != id => { @@ -542,12 +541,12 @@ async fn handle_incoming_peer_message( ); } RequestCollation(request_id, relay_parent, para_id) => { - let _span = jaeger::hash_span(&relay_parent, "rx-collation-request"); + let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("request-collation")); match state.collating_on { Some(our_para_id) => { if our_para_id == para_id { if let Some(collation) = state.collations.get(&relay_parent).cloned() { - let _span = _span.child("sending"); + let _span = _span.as_ref().map(|s| s.child("sending")); send_collation(ctx, state, request_id, origin, collation.0, collation.1).await; } } else { @@ -665,12 +664,13 @@ async fn handle_network_msg( #[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))] async fn handle_our_view_change( state: &mut State, - view: View, + view: OurView, ) -> Result<()> { for removed in state.view.difference(&view) { state.collations.remove(removed); state.our_validators_groups.remove(removed); state.connection_requests.remove(removed); + state.span_per_relay_parent.remove(removed); } state.view = view; @@ -725,11 +725,10 @@ pub(crate) async fn run( mod tests { use super::*; - use std::time::Duration; + use std::{time::Duration, sync::Arc}; use assert_matches::assert_matches; use futures::{executor, future, Future, channel::mpsc}; - use smallvec::smallvec; use sp_core::crypto::Pair; use sp_keyring::Sr25519Keyring; @@ -739,10 +738,10 @@ mod tests { ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId, SessionIndex, SessionInfo, }; - use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}}; + use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan}; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::view; + use polkadot_node_network_protocol::{view, our_view}; #[derive(Default)] struct TestCandidateBuilder { @@ -888,17 +887,15 @@ mod tests { self.relay_parent.randomize(); } - let hashes = if merge_views { - vec![old_relay_parent, self.relay_parent] + let our_view = if merge_views { + our_view![old_relay_parent, self.relay_parent] } else { - vec![self.relay_parent] + our_view![self.relay_parent] }; overseer_send( virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(View { heads: hashes, finalized_number: 0 }), - ), + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(our_view)), ).await; } } @@ -997,15 +994,15 @@ mod tests { overseer_signal( virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![test_state.relay_parent], - deactivated: smallvec![], + activated: [(test_state.relay_parent, Arc::new(JaegerSpan::Disabled))][..].into(), + deactivated: [][..].into(), }), ).await; overseer_send( virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent]), + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]), ), ).await; } diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index 562bb9ab60..561dda7d74 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -14,9 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::collections::{HashMap, HashSet}; -use std::time::Duration; -use std::task::Poll; +use std::{collections::{HashMap, HashSet}, time::Duration, task::Poll, sync::Arc}; use futures::{ StreamExt, @@ -30,20 +28,16 @@ use polkadot_primitives::v1::{ Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV, }; use polkadot_subsystem::{ - jaeger, + jaeger, PerLeafSpan, JaegerSpan, FromOverseer, OverseerSignal, SubsystemContext, messages::{ AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage, }, }; use polkadot_node_network_protocol::{ - v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId, - NetworkBridgeEvent, -}; -use polkadot_node_subsystem_util::{ - TimeoutExt as _, - metrics::{self, prometheus}, + v1 as protocol_v1, View, OurView, PeerId, ReputationChange as Rep, RequestId, NetworkBridgeEvent, }; +use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}}; use super::{modify_reputation, LOG_TARGET, Result}; @@ -172,7 +166,7 @@ struct PerRequest { #[derive(Default)] struct State { /// Our own view. - view: View, + view: OurView, /// Track all active collators and their views. peer_views: HashMap, @@ -215,6 +209,9 @@ struct State { /// Metrics. metrics: Metrics, + + /// Span per relay parent. + span_per_relay_parent: HashMap, } /// Another subsystem has requested to fetch collations on a particular leaf for some para. @@ -505,7 +502,7 @@ where state.peer_views.entry(origin).or_default(); } AdvertiseCollation(relay_parent, para_id) => { - let _span = jaeger::hash_span(&relay_parent, "advertising-collation"); + let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation")); state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)); if let Some(collator) = state.known_collators.get(&origin) { @@ -517,7 +514,8 @@ where modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } Collation(request_id, receipt, pov) => { - let _span1 = jaeger::hash_span(&receipt.descriptor.relay_parent, "received-collation"); + let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent) + .map(|s| s.child("received-collation")); let _span2 = jaeger::pov_span(&pov, "received-collation"); received_collation(ctx, state, origin, request_id, receipt, pov).await; } @@ -556,9 +554,19 @@ async fn remove_relay_parent( #[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))] async fn handle_our_view_change( state: &mut State, - view: View, + view: OurView, ) -> Result<()> { - let old_view = std::mem::replace(&mut (state.view), view); + let old_view = std::mem::replace(&mut state.view, view); + + let added: HashMap> = state.view + .span_per_head() + .iter() + .filter(|v| !old_view.contains(&v.0)) + .map(|v| (v.0.clone(), v.1.clone())) + .collect(); + added.into_iter().for_each(|(h, s)| { + state.span_per_relay_parent.insert(h, PerLeafSpan::new(s, "validator-side")); + }); let removed = old_view .difference(&state.view) @@ -571,6 +579,7 @@ async fn handle_our_view_change( for removed in removed.into_iter() { state.recently_removed_heads.insert(removed.clone()); remove_relay_parent(state, removed).await?; + state.span_per_relay_parent.remove(&removed); } Ok(()) @@ -663,7 +672,7 @@ where ); } FetchCollation(relay_parent, collator_id, para_id, tx) => { - let _span = jaeger::hash_span(&relay_parent, "fetching-collation"); + let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("fetch-collation")); fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await; } ReportCollator(id) => { @@ -760,7 +769,7 @@ mod tests { use polkadot_primitives::v1::{BlockData, CollatorPair}; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::view; + use polkadot_node_network_protocol::our_view; #[derive(Clone)] struct TestState { @@ -873,7 +882,7 @@ mod tests { overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent]) + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) ) ).await; @@ -931,7 +940,7 @@ mod tests { overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent]) + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) ) ).await; @@ -1022,7 +1031,7 @@ mod tests { overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![Hash::repeat_byte(0x42)]) + NetworkBridgeEvent::OurViewChange(our_view![Hash::repeat_byte(0x42)]) ) ).await; @@ -1050,7 +1059,7 @@ mod tests { overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent]) + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) ) ).await; @@ -1134,8 +1143,8 @@ mod tests { overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent]) - ) + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) + ), ).await; let peer_b = PeerId::random(); diff --git a/polkadot/node/network/pov-distribution/Cargo.toml b/polkadot/node/network/pov-distribution/Cargo.toml index 3e3230ab85..a78eb21beb 100644 --- a/polkadot/node/network/pov-distribution/Cargo.toml +++ b/polkadot/node/network/pov-distribution/Cargo.toml @@ -19,7 +19,6 @@ polkadot-node-network-protocol = { path = "../../network/protocol" } assert_matches = "1.4.0" env_logger = "0.8.1" log = "0.4.11" -smallvec = "1.5.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 6c882c8c00..5ee1f14063 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -40,7 +40,7 @@ use polkadot_node_subsystem_util::{ metrics::{self, prometheus}, }; use polkadot_node_network_protocol::{ - v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View, + v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, OurView, }; use futures::prelude::*; @@ -96,7 +96,7 @@ struct State { peer_state: HashMap, /// Our own view. - our_view: View, + our_view: OurView, /// Connect to relevant groups of validators at different relay parents. connection_requests: validator_discovery::ConnectionRequests, @@ -152,8 +152,8 @@ async fn handle_signal( OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { let _timer = state.metrics.time_handle_signal(); - for relay_parent in activated { - match request_validators_ctx(relay_parent.clone(), ctx).await { + for (relay_parent, _span) in activated { + match request_validators_ctx(relay_parent, ctx).await { Ok(vals_rx) => { let n_validators = match vals_rx.await? { Ok(v) => v.len(), diff --git a/polkadot/node/network/pov-distribution/src/tests.rs b/polkadot/node/network/pov-distribution/src/tests.rs index f0fec49654..285f479e76 100644 --- a/polkadot/node/network/pov-distribution/src/tests.rs +++ b/polkadot/node/network/pov-distribution/src/tests.rs @@ -1,11 +1,26 @@ +// Copyright 2020-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + use super::*; -use std::time::Duration; +use std::{time::Duration, sync::Arc}; use assert_matches::assert_matches; use futures::executor; use tracing::trace; -use smallvec::smallvec; use sp_keyring::Sr25519Keyring; @@ -13,10 +28,10 @@ use polkadot_primitives::v1::{ AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId, ScheduledCore, ValidatorIndex, SessionIndex, SessionInfo, }; -use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest}; +use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan}; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt; -use polkadot_node_network_protocol::view; +use polkadot_node_network_protocol::{view, our_view}; fn make_pov(data: Vec) -> PoV { PoV { block_data: BlockData(data) } @@ -261,8 +276,8 @@ fn ask_validators_for_povs() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![test_state.relay_parent.clone()], - deactivated: smallvec![], + activated: [(test_state.relay_parent, Arc::new(JaegerSpan::Disabled))][..].into(), + deactivated: [][..].into(), }), ).await; @@ -429,8 +444,8 @@ fn ask_validators_for_povs() { overseer_signal( &mut virtual_overseer, OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: smallvec![next_leaf.clone()], - deactivated: smallvec![current.clone()], + activated: [(next_leaf, Arc::new(JaegerSpan::Disabled))][..].into(), + deactivated: [current.clone()][..].into(), }) ).await; @@ -583,7 +598,7 @@ fn distributes_to_those_awaiting_and_completes_local() { s }, - our_view: view![hash_a, hash_b], + our_view: our_view![hash_a, hash_b], metrics: Default::default(), connection_requests: Default::default(), }; @@ -666,7 +681,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -840,7 +855,7 @@ fn peer_view_change_leads_to_us_informing() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -913,7 +928,7 @@ fn peer_complete_fetch_and_is_rewarded() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1003,7 +1018,7 @@ fn peer_punished_for_sending_bad_pov() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1068,7 +1083,7 @@ fn peer_punished_for_sending_unexpected_pov() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1131,7 +1146,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1191,7 +1206,7 @@ fn peer_reported_for_awaiting_too_much() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1278,7 +1293,7 @@ fn peer_reported_for_awaiting_outside_their_view() { s }, - our_view: view![hash_a, hash_b], + our_view: our_view![hash_a, hash_b], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1342,7 +1357,7 @@ fn peer_reported_for_awaiting_outside_our_view() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1421,7 +1436,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; @@ -1505,7 +1520,7 @@ fn peer_completing_request_no_longer_awaiting() { s }, - our_view: view![hash_a], + our_view: our_view![hash_a], metrics: Default::default(), connection_requests: Default::default(), }; diff --git a/polkadot/node/network/protocol/Cargo.toml b/polkadot/node/network/protocol/Cargo.toml index 273727d5b7..5829ccdf82 100644 --- a/polkadot/node/network/protocol/Cargo.toml +++ b/polkadot/node/network/protocol/Cargo.toml @@ -8,5 +8,6 @@ description = "Primitives types for the Node-side" [dependencies] polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } +polkadot-node-jaeger = { path = "../../jaeger" } parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index fe40e06be2..35dc3b3a17 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -21,10 +21,13 @@ use polkadot_primitives::v1::{Hash, BlockNumber}; use parity_scale_codec::{Encode, Decode}; -use std::convert::TryFrom; -use std::fmt; +use std::{convert::TryFrom, fmt, collections::HashMap}; pub use sc_network::{ReputationChange, PeerId}; +#[doc(hidden)] +pub use polkadot_node_jaeger::JaegerSpan; +#[doc(hidden)] +pub use std::sync::Arc; /// A unique identifier of a request. pub type RequestId = u64; @@ -44,7 +47,6 @@ impl fmt::Display for WrongVariant { impl std::error::Error for WrongVariant {} - /// The peer-sets that the network manages. Different subsystems will use different peer-sets. #[derive(Debug, Clone, Copy, PartialEq)] pub enum PeerSet { @@ -103,8 +105,8 @@ pub enum NetworkBridgeEvent { /// Peer's `View` has changed. PeerViewChange(PeerId, View), - /// Our `View` has changed. - OurViewChange(View), + /// Our view has changed. + OurViewChange(OurView), } macro_rules! impl_try_from { @@ -159,6 +161,72 @@ impl NetworkBridgeEvent { } } +/// Specialized wrapper around [`View`]. +/// +/// Besides the access to the view itself, it also gives access to the [`JaegerSpan`] per leave/head. +#[derive(Debug, Clone, Default)] +pub struct OurView { + view: View, + span_per_head: HashMap>, +} + +impl OurView { + /// Creates a new instance. + pub fn new(heads: impl IntoIterator)>, finalized_number: BlockNumber) -> Self { + let state_per_head = heads.into_iter().collect::>(); + + Self { + view: View { + heads: state_per_head.keys().cloned().collect(), + finalized_number, + }, + span_per_head: state_per_head, + } + } + + /// Returns the span per head map. + /// + /// For each head there exists one span in this map. + pub fn span_per_head(&self) -> &HashMap> { + &self.span_per_head + } +} + +impl PartialEq for OurView { + fn eq(&self, other: &Self) -> bool { + self.view == other.view + } +} + +impl std::ops::Deref for OurView { + type Target = View; + + fn deref(&self) -> &View { + &self.view + } +} + +/// Construct a new [`OurView`] with the given chain heads, finalized number 0 and disabled [`JaegerSpan`]'s. +/// +/// NOTE: Use for tests only. +/// +/// # Example +/// +/// ``` +/// # use polkadot_node_network_protocol::our_view; +/// # use polkadot_primitives::v1::Hash; +/// let our_view = our_view![Hash::repeat_byte(1), Hash::repeat_byte(2)]; +/// ``` +#[macro_export] +macro_rules! our_view { + ( $( $hash:expr ),* $(,)? ) => { + $crate::OurView::new( + vec![ $( $hash.clone() ),* ].into_iter().map(|h| (h, $crate::Arc::new($crate::JaegerSpan::Disabled))), + 0, + ) + }; +} + /// A succinct representation of a peer's view. This consists of a bounded amount of chain heads /// and the highest known finalized block number. /// @@ -171,18 +239,21 @@ pub struct View { pub finalized_number: BlockNumber, } - /// Construct a new view with the given chain heads and finalized number 0. +/// /// NOTE: Use for tests only. +/// /// # Example /// -/// ```ignore -/// view![Hash::repeat_byte(1), Hash::repeat_byte(2)] +/// ``` +/// # use polkadot_node_network_protocol::view; +/// # use polkadot_primitives::v1::Hash; +/// let view = view![Hash::repeat_byte(1), Hash::repeat_byte(2)]; /// ``` #[macro_export] macro_rules! view { ( $( $hash:expr ),* $(,)? ) => { - View { heads: vec![ $( $hash.clone() ),* ], finalized_number: 0 } + $crate::View { heads: vec![ $( $hash.clone() ),* ], finalized_number: 0 } }; } diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 6fee53975c..05ff1753f2 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -23,9 +23,8 @@ #![warn(missing_docs)] use polkadot_subsystem::{ - jaeger, Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, - ActiveLeavesUpdate, FromOverseer, OverseerSignal, + ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, messages::{ AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, @@ -37,7 +36,7 @@ use polkadot_primitives::v1::{ Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash, }; use polkadot_node_network_protocol::{ - v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent, + v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent, OurView, }; use futures::prelude::*; @@ -390,14 +389,14 @@ struct ActiveHeadData { /// How many `Seconded` statements we've seen per validator. seconded_counts: HashMap, /// A Jaeger span for this head, so we can attach data to it. - span: jaeger::JaegerSpan, + span: PerLeafSpan, } impl ActiveHeadData { fn new( validators: Vec, session_index: sp_staking::SessionIndex, - relay_parent: &Hash, + span: PerLeafSpan, ) -> Self { ActiveHeadData { candidates: Default::default(), @@ -405,7 +404,7 @@ impl ActiveHeadData { validators, session_index, seconded_counts: Default::default(), - span: jaeger::hash_span(&relay_parent, "statement-dist-active"), + span, } } @@ -839,7 +838,7 @@ async fn handle_network_update( peers: &mut HashMap, active_heads: &mut HashMap, ctx: &mut impl SubsystemContext, - our_view: &mut View, + our_view: &mut OurView, update: NetworkBridgeEvent, metrics: &Metrics, statement_listeners: &mut StatementListeners, @@ -930,7 +929,7 @@ impl StatementDistribution { mut ctx: impl SubsystemContext, ) -> SubsystemResult<()> { let mut peers: HashMap = HashMap::new(); - let mut our_view = View::default(); + let mut our_view = OurView::default(); let mut active_heads: HashMap = HashMap::new(); let mut statement_listeners = StatementListeners::new(); let metrics = self.metrics; @@ -941,7 +940,9 @@ impl StatementDistribution { FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => { let _timer = metrics.time_active_leaves_update(); - for relay_parent in activated { + for (relay_parent, span) in activated { + let span = PerLeafSpan::new(span, "statement-distribution"); + let (validators, session_index) = { let (val_tx, val_rx) = oneshot::channel(); let (session_tx, session_rx) = oneshot::channel(); @@ -981,7 +982,7 @@ impl StatementDistribution { }; active_heads.entry(relay_parent) - .or_insert(ActiveHeadData::new(validators, session_index, &relay_parent)); + .or_insert(ActiveHeadData::new(validators, session_index, span)); } } FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => { @@ -1117,7 +1118,8 @@ mod tests { use futures::executor::{self, block_on}; use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore}; use sc_keystore::LocalKeystore; - use polkadot_node_network_protocol::{view, ObservedRole}; + use polkadot_node_network_protocol::{view, ObservedRole, our_view}; + use polkadot_subsystem::JaegerSpan; #[test] fn active_head_accepts_only_2_seconded_per_validator() { @@ -1155,7 +1157,11 @@ mod tests { c }; - let mut head_data = ActiveHeadData::new(validators, session_index, &parent_hash); + let mut head_data = ActiveHeadData::new( + validators, + session_index, + PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"), + ); let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); let alice_public = SyncCryptoStore::sr25519_generate_new( @@ -1413,7 +1419,11 @@ mod tests { ).unwrap(); let new_head_data = { - let mut data = ActiveHeadData::new(validators, session_index, &hash_c); + let mut data = ActiveHeadData::new( + validators, + session_index, + PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"), + ); let noted = data.note_statement(block_on(SignedFullStatement::sign( &keystore, @@ -1665,7 +1675,7 @@ mod tests { let test_fut = async move { // register our active heads. handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: vec![hash_a].into(), + activated: vec![(hash_a, Arc::new(JaegerSpan::Disabled))].into(), deactivated: vec![].into(), }))).await; @@ -1718,7 +1728,7 @@ mod tests { handle.send(FromOverseer::Communication { msg: StatementDistributionMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(view![hash_a]) + NetworkBridgeEvent::OurViewChange(our_view![hash_a]) ) }).await; diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 9e5ff2a754..687a9ceab2 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -88,12 +88,11 @@ use polkadot_subsystem::messages::{ }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, - SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, + SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, JaegerSpan, jaeger, }; use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}}; use polkadot_node_primitives::SpawnNamed; - // A capacity of bounded channels inside the overseer. const CHANNEL_CAPACITY: usize = 1024; // A graceful `Overseer` teardown time delay. @@ -490,6 +489,9 @@ pub struct Overseer { /// External listeners waiting for a hash to be in the active-leave set. activation_external_listeners: HashMap>>>, + /// Stores the [`JaegerSpan`] per active leaf. + span_per_active_leaf: HashMap>, + /// A set of leaves that `Overseer` starts working with. /// /// Drained at the beginning of `run` and never used again. @@ -1277,6 +1279,7 @@ where leaves, active_leaves, metrics, + span_per_active_leaf: Default::default(), }; Ok((this, handler)) @@ -1321,9 +1324,9 @@ where let mut update = ActiveLeavesUpdate::default(); for (hash, number) in std::mem::take(&mut self.leaves) { - update.activated.push(hash); let _ = self.active_leaves.insert(hash, number); - self.on_head_activated(&hash); + let span = self.on_head_activated(&hash); + update.activated.push((hash, span)); } self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; @@ -1390,32 +1393,26 @@ where #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { - let mut update = ActiveLeavesUpdate::default(); + match self.active_leaves.entry(block.hash) { + hash_map::Entry::Vacant(entry) => entry.insert(block.number), + hash_map::Entry::Occupied(entry) => { + debug_assert_eq!(*entry.get(), block.number); + return Ok(()); + } + }; + + let span = self.on_head_activated(&block.hash); + let mut update = ActiveLeavesUpdate::start_work(block.hash, span); if let Some(number) = self.active_leaves.remove(&block.parent_hash) { - if let Some(expected_parent_number) = block.number.checked_sub(1) { - debug_assert_eq!(expected_parent_number, number); - } + debug_assert_eq!(block.number.saturating_sub(1), number); update.deactivated.push(block.parent_hash); self.on_head_deactivated(&block.parent_hash); } - match self.active_leaves.entry(block.hash) { - hash_map::Entry::Vacant(entry) => { - update.activated.push(block.hash); - let _ = entry.insert(block.number); - self.on_head_activated(&block.hash); - }, - hash_map::Entry::Occupied(entry) => { - debug_assert_eq!(*entry.get(), block.number); - } - } - self.clean_up_external_listeners(); - self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; - - Ok(()) + self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] @@ -1519,7 +1516,7 @@ where } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - fn on_head_activated(&mut self, hash: &Hash) { + fn on_head_activated(&mut self, hash: &Hash) -> Arc { self.metrics.on_head_activated(); if let Some(listeners) = self.activation_external_listeners.remove(hash) { for listener in listeners { @@ -1527,15 +1524,17 @@ where let _ = listener.send(Ok(())); } } + + let span = Arc::new(jaeger::hash_span(hash, "leave activated")); + self.span_per_active_leaf.insert(*hash, span.clone()); + span } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] fn on_head_deactivated(&mut self, hash: &Hash) { self.metrics.on_head_deactivated(); - if let Some(listeners) = self.activation_external_listeners.remove(hash) { - // clean up and signal to listeners the block is deactivated - drop(listeners); - } + self.activation_external_listeners.remove(hash); + self.span_per_active_leaf.remove(hash); } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] @@ -1615,7 +1614,7 @@ mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending}; use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash}; - use polkadot_subsystem::messages::RuntimeApiRequest; + use polkadot_subsystem::{messages::RuntimeApiRequest, JaegerSpan}; use polkadot_node_primitives::{Collation, CollationGenerationConfig}; use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent}; @@ -1980,13 +1979,16 @@ mod tests { handler.block_imported(third_block).await; let expected_heartbeats = vec![ - OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)), + OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work( + first_block_hash, + Arc::new(JaegerSpan::Disabled), + )), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [second_block_hash].as_ref().into(), + activated: [(second_block_hash, Arc::new(JaegerSpan::Disabled))].as_ref().into(), deactivated: [first_block_hash].as_ref().into(), }), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [third_block_hash].as_ref().into(), + activated: [(third_block_hash, Arc::new(JaegerSpan::Disabled))].as_ref().into(), deactivated: [second_block_hash].as_ref().into(), }), ]; @@ -2074,7 +2076,10 @@ mod tests { let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - activated: [first_block_hash, second_block_hash].as_ref().into(), + activated: [ + (first_block_hash, Arc::new(JaegerSpan::Disabled)), + (second_block_hash, Arc::new(JaegerSpan::Disabled)), + ].as_ref().into(), ..Default::default() }), OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 6484fa4a91..b85db71f0e 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -19,6 +19,7 @@ tracing-futures = "0.2.4" polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem = { path = "../subsystem" } +polkadot-node-jaeger = { path = "../jaeger" } polkadot-primitives = { path = "../../primitives" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 0bc69cb3d1..e9fd475f10 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -29,6 +29,7 @@ use polkadot_node_subsystem::{ messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent}, FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult, }; +use polkadot_node_jaeger::JaegerSpan; use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream}; use futures_timer::Delay; use parity_scale_codec::Encode; @@ -36,27 +37,14 @@ use pin_project::pin_project; use polkadot_primitives::v1::{ CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData, GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption, - SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, - SessionInfo, -}; -use sp_core::{ - traits::SpawnNamed, - Public + SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo, }; +use sp_core::{traits::SpawnNamed, Public}; use sp_application_crypto::AppKey; -use sp_keystore::{ - CryptoStore, - SyncCryptoStorePtr, - Error as KeystoreError, -}; +use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError}; use std::{ - collections::{HashMap, hash_map::Entry}, - convert::{TryFrom, TryInto}, - marker::Unpin, - pin::Pin, - task::{Poll, Context}, - time::Duration, - fmt, + collections::{HashMap, hash_map::Entry}, convert::{TryFrom, TryInto}, marker::Unpin, pin::Pin, task::{Poll, Context}, + time::Duration, fmt, sync::Arc, }; use streamunordered::{StreamUnordered, StreamYield}; use thiserror::Error; @@ -494,6 +482,7 @@ pub trait JobTrait: Unpin { /// The job should be ended when `receiver` returns `None`. fn run( parent: Hash, + span: Arc, run_args: Self::RunArgs, metrics: Self::Metrics, receiver: mpsc::Receiver, @@ -561,14 +550,20 @@ impl Jobs { } /// Spawn a new job for this `parent_hash`, with whatever args are appropriate. - fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs, metrics: Job::Metrics) -> Result<(), Error> { + fn spawn_job( + &mut self, + parent_hash: Hash, + span: Arc, + run_args: Job::RunArgs, + metrics: Job::Metrics, + ) -> Result<(), Error> { let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY); let err_tx = self.errors.clone(); let (future, abort_handle) = future::abortable(async move { - if let Err(e) = Job::run(parent_hash, run_args, metrics, to_job_rx, from_job_tx).await { + if let Err(e) = Job::run(parent_hash, span, run_args, metrics, to_job_rx, from_job_tx).await { tracing::error!( job = Job::NAME, parent_hash = %parent_hash, @@ -782,9 +777,9 @@ where activated, deactivated, }))) => { - for hash in activated { + for (hash, span) in activated { let metrics = metrics.clone(); - if let Err(e) = jobs.spawn_job(hash, run_args.clone(), metrics) { + if let Err(e) = jobs.spawn_job(hash, span, run_args.clone(), metrics) { tracing::error!( job = Job::NAME, err = ?e, @@ -998,13 +993,13 @@ mod tests { use thiserror::Error; use polkadot_node_subsystem::{ messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, - SpawnedSubsystem, + SpawnedSubsystem, JaegerSpan, }; use assert_matches::assert_matches; use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt}; use polkadot_primitives::v1::Hash; use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context}; - use std::{pin::Pin, time::Duration}; + use std::{pin::Pin, time::Duration, sync::Arc}; // basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do; // you can leave the subsystem itself to the job manager. @@ -1040,6 +1035,7 @@ mod tests { // this function is in charge of creating and executing the job's main loop fn run( _: Hash, + _: Arc, run_args: Self::RunArgs, _metrics: Self::Metrics, receiver: mpsc::Receiver, @@ -1123,7 +1119,7 @@ mod tests { test_harness(true, |mut overseer_handle, err_rx| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(relay_parent), + ActiveLeavesUpdate::start_work(relay_parent, Arc::new(JaegerSpan::Disabled)), ))) .await; assert_matches!( @@ -1152,7 +1148,7 @@ mod tests { test_harness(true, |mut overseer_handle, err_rx| async move { overseer_handle .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves( - ActiveLeavesUpdate::start_work(relay_parent), + ActiveLeavesUpdate::start_work(relay_parent, Arc::new(JaegerSpan::Disabled)), ))) .await; diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml index 4a59a32816..d9ea049a57 100644 --- a/polkadot/node/subsystem/Cargo.toml +++ b/polkadot/node/subsystem/Cargo.toml @@ -22,6 +22,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } +polkadot-node-jaeger = { path = "../jaeger" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } smallvec = "1.5.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/subsystem/src/errors.rs b/polkadot/node/subsystem/src/errors.rs index 243577c353..5af573c87f 100644 --- a/polkadot/node/subsystem/src/errors.rs +++ b/polkadot/node/subsystem/src/errors.rs @@ -59,15 +59,3 @@ impl core::fmt::Display for ChainApiError { } impl std::error::Error for ChainApiError {} - - -/// A description of an error causing the chain API request to be unservable. -#[derive(Debug, thiserror::Error)] -#[allow(missing_docs)] -pub enum JaegerError { - #[error("Already launched the collector thread")] - AlreadyLaunched, - - #[error("Missing jaeger configuration")] - MissingConfiguration, -} diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 83b72111d4..ad049bf39d 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -22,7 +22,7 @@ #![warn(missing_docs)] -use std::pin::Pin; +use std::{pin::Pin, sync::Arc, fmt}; use futures::prelude::*; use futures::channel::{mpsc, oneshot}; @@ -36,8 +36,9 @@ use crate::messages::AllMessages; pub mod errors; pub mod messages; -pub mod jaeger; -pub use crate::jaeger::*; + +pub use polkadot_node_jaeger as jaeger; +pub use jaeger::*; /// How many slots are stack-reserved for active leaves updates /// @@ -48,18 +49,21 @@ const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8; /// Changes in the set of active leaves: the parachain heads which we care to work on. /// /// Note that the activated and deactivated fields indicate deltas, not complete sets. -#[derive(Clone, Debug, Default, Eq)] +#[derive(Clone, Default)] pub struct ActiveLeavesUpdate { - /// New relay chain block hashes of interest. - pub activated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, + /// New relay chain block hashes of interest and their associated [`JaegerSpan`]. + /// + /// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped + /// when the leaf is deactivated. + pub activated: SmallVec<[(Hash, Arc); ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, /// Relay chain block hashes no longer of interest. pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>, } impl ActiveLeavesUpdate { /// Create a ActiveLeavesUpdate with a single activated hash - pub fn start_work(hash: Hash) -> Self { - Self { activated: [hash][..].into(), ..Default::default() } + pub fn start_work(hash: Hash, span: Arc) -> Self { + Self { activated: [(hash, span)][..].into(), ..Default::default() } } /// Create a ActiveLeavesUpdate with a single deactivated hash @@ -79,11 +83,27 @@ impl PartialEq for ActiveLeavesUpdate { /// Instead, it means equality when `activated` and `deactivated` are considered as sets. fn eq(&self, other: &Self) -> bool { self.activated.len() == other.activated.len() && self.deactivated.len() == other.deactivated.len() - && self.activated.iter().all(|a| other.activated.contains(a)) + && self.activated.iter().all(|a| other.activated.iter().any(|o| a.0 == o.0)) && self.deactivated.iter().all(|a| other.deactivated.contains(a)) } } +impl fmt::Debug for ActiveLeavesUpdate { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + struct Activated<'a>(&'a [(Hash, Arc)]); + impl fmt::Debug for Activated<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_list().entries(self.0.iter().map(|e| e.0)).finish() + } + } + + f.debug_struct("ActiveLeavesUpdate") + .field("activated", &Activated(&self.activated)) + .field("deactivated", &self.deactivated) + .finish() + } +} + /// Signals sent by an overseer to a subsystem. #[derive(PartialEq, Clone, Debug)] pub enum OverseerSignal { @@ -139,7 +159,7 @@ pub enum SubsystemError { Prometheus(#[from] substrate_prometheus_endpoint::PrometheusError), #[error(transparent)] - Jaeger(#[from] errors::JaegerError), + Jaeger(#[from] JaegerError), #[error("Failed to {0}")] Context(String), diff --git a/polkadot/runtime/rococo/src/lib.rs b/polkadot/runtime/rococo/src/lib.rs index d5dc73a7ce..e15b8dcf9f 100644 --- a/polkadot/runtime/rococo/src/lib.rs +++ b/polkadot/runtime/rococo/src/lib.rs @@ -105,7 +105,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("rococo"), impl_name: create_runtime_str!("parity-rococo-v1"), authoring_version: 0, - spec_version: 13, + spec_version: 14, impl_version: 0, #[cfg(not(feature = "disable-runtime-api"))] apis: RUNTIME_API_VERSIONS,