Add one Jaeger span per relay parent (#2196)

* Add one Jaeger span per relay parent

This adds one Jaeger span per relay parent, instead of always creating
new spans per relay parent. This should improve the UI view, because
subsystems are now grouped below one common span.

* Fix doc tests

* Replace `PerLeaveSpan` to `PerLeafSpan`

* More renaming

* Moare

* Update node/subsystem/src/lib.rs

Co-authored-by: Andronik Ordian <write@reusable.software>

* Skip the spans

* Increase `spec_version`

Co-authored-by: Andronik Ordian <write@reusable.software>
This commit is contained in:
Bastian Köcher
2021-01-05 15:09:25 +01:00
committed by GitHub
parent ceb9e2161c
commit 5be092894e
32 changed files with 535 additions and 322 deletions
+18 -3
View File
@@ -4969,7 +4969,6 @@ dependencies = [
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"smallvec 1.5.1",
"sp-core",
"sp-keyring",
"thiserror",
@@ -5057,7 +5056,6 @@ dependencies = [
"polkadot-overseer",
"polkadot-primitives",
"sc-service",
"smallvec 1.5.1",
"sp-core",
"thiserror",
"tracing",
@@ -5208,11 +5206,27 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "polkadot-node-jaeger"
version = "0.1.0"
dependencies = [
"async-std",
"lazy_static",
"log",
"mick-jaeger",
"parking_lot 0.11.1",
"polkadot-primitives",
"sc-network",
"sp-core",
"thiserror",
]
[[package]]
name = "polkadot-node-network-protocol"
version = "0.1.0"
dependencies = [
"parity-scale-codec",
"polkadot-node-jaeger",
"polkadot-node-primitives",
"polkadot-primitives",
"sc-network",
@@ -5247,6 +5261,7 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.2",
"polkadot-node-jaeger",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem-test-helpers",
@@ -5297,6 +5312,7 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.2",
"polkadot-node-jaeger",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
@@ -5368,7 +5384,6 @@ dependencies = [
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"smallvec 1.5.1",
"sp-core",
"sp-keyring",
"thiserror",
+1
View File
@@ -65,6 +65,7 @@ members = [
"node/subsystem",
"node/subsystem-test-helpers",
"node/subsystem-util",
"node/jaeger",
"node/test/client",
"node/test/service",
"parachain/test-parachains",
+17 -14
View File
@@ -126,12 +126,17 @@ impl CollationGenerationSubsystem {
// follow the procedure from the guide
if let Some(config) = &self.config {
let metrics = self.metrics.clone();
if let Err(err) =
handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await
{
if let Err(err) = handle_new_activations(
config.clone(),
activated.into_iter().map(|v| v.0),
ctx,
metrics,
sender,
).await {
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activations");
};
}
}
false
}
Ok(Signal(Conclude)) => true,
@@ -164,10 +169,10 @@ where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
let future = async move {
self.run(ctx).await;
Ok(())
});
}.boxed();
SpawnedSubsystem {
name: "collation-generation-subsystem",
@@ -176,10 +181,10 @@ where
}
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, sender), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, metrics, sender, activated), fields(subsystem = LOG_TARGET))]
async fn handle_new_activations<Context: SubsystemContext>(
config: Arc<CollationGenerationConfig>,
activated: &[Hash],
activated: impl IntoIterator<Item = Hash>,
ctx: &mut Context,
metrics: Metrics,
sender: &mpsc::Sender<AllMessages>,
@@ -189,11 +194,9 @@ async fn handle_new_activations<Context: SubsystemContext>(
let _overall_timer = metrics.time_new_activations();
for relay_parent in activated.iter().copied() {
for relay_parent in activated {
let _relay_parent_timer = metrics.time_new_activations_relay_parent();
// double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and
// returns a receiver. The second layer of requests actually polls those receivers to completion.
let (availability_cores, validators) = join!(
request_availability_cores_ctx(relay_parent, ctx).await?,
request_validators_ctx(relay_parent, ctx).await?,
@@ -544,7 +547,7 @@ mod tests {
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(
test_config(123u32),
&subsystem_activated_hashes,
subsystem_activated_hashes,
&mut ctx,
Metrics(None),
&tx,
@@ -623,7 +626,7 @@ mod tests {
let (tx, _rx) = mpsc::channel(0);
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx)
handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
});
@@ -700,7 +703,7 @@ mod tests {
let sent_messages = Arc::new(Mutex::new(Vec::new()));
let subsystem_sent_messages = sent_messages.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx)
handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
-1
View File
@@ -26,7 +26,6 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
smallvec = "1.5.1"
kvdb-memorydb = "0.7.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+1 -1
View File
@@ -534,7 +534,7 @@ where
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate { activated, .. })
) => {
for activated in activated.into_iter() {
for (activated, _span) in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?;
}
}
+9 -10
View File
@@ -23,7 +23,6 @@ use futures::{
executor,
Future,
};
use smallvec::smallvec;
use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData,
@@ -31,7 +30,7 @@ use polkadot_primitives::v1::{
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem::{
ActiveLeavesUpdate, errors::RuntimeApiError,
ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
@@ -182,8 +181,8 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
@@ -516,8 +515,8 @@ fn stored_data_kept_until_finalized() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
@@ -620,8 +619,8 @@ fn stored_chunk_kept_until_finalized() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
@@ -758,8 +757,8 @@ fn forkfullness_works() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf_1.clone(), new_leaf_2.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf_1, Arc::new(JaegerSpan::Disabled)), (new_leaf_2, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
+8 -4
View File
@@ -37,7 +37,7 @@ use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
};
use polkadot_subsystem::{
jaeger::{self, JaegerSpan},
JaegerSpan, PerLeafSpan,
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
@@ -923,9 +923,10 @@ impl util::JobTrait for CandidateBackingJob {
const NAME: &'static str = "CandidateBackingJob";
#[tracing::instrument(skip(keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(span, keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
fn run(
parent: Hash,
span: Arc<JaegerSpan>,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
@@ -952,7 +953,7 @@ impl util::JobTrait for CandidateBackingJob {
}
}
let span = jaeger::hash_span(&parent, "run:backing");
let span = PerLeafSpan::new(span, "backing");
let _span = span.child("runtime-apis");
let (validators, groups, session_index, cores) = futures::try_join!(
@@ -1340,7 +1341,10 @@ mod tests {
) {
// Start work on some new parent.
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(test_state.relay_parent)))
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
test_state.relay_parent,
Arc::new(JaegerSpan::Disabled),
)))
).await;
// Check that subsystem job issues a request for a validator set.
@@ -23,7 +23,7 @@
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use polkadot_node_subsystem::{
jaeger,
jaeger, PerLeafSpan, JaegerSpan,
messages::{
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
@@ -34,7 +34,7 @@ use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{pin::Pin, time::Duration, iter::FromIterator};
use std::{pin::Pin, time::Duration, iter::FromIterator, sync::Arc};
use wasm_timer::{Delay, Instant};
/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
@@ -215,9 +215,10 @@ impl JobTrait for BitfieldSigningJob {
const NAME: &'static str = "BitfieldSigningJob";
/// Run a job for the parent block indicated
#[tracing::instrument(skip(keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
span: Arc<JaegerSpan>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
@@ -225,7 +226,7 @@ impl JobTrait for BitfieldSigningJob {
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
let span = jaeger::hash_span(&relay_parent, "run:bitfield-signing");
let span = PerLeafSpan::new(span, "bitfield-signing");
let _span = span.child("delay");
let wait_until = Instant::now() + JOB_DELAY;
@@ -25,7 +25,7 @@ use futures::{
};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem::{
jaeger,
jaeger, JaegerSpan, PerLeafSpan,
errors::ChainApiError,
messages::{
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
@@ -39,7 +39,7 @@ use polkadot_node_subsystem_util::{
use polkadot_primitives::v1::{
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV,
};
use std::pin::Pin;
use std::{pin::Pin, sync::Arc};
use thiserror::Error;
const LOG_TARGET: &'static str = "candidate_selection";
@@ -95,12 +95,13 @@ impl JobTrait for CandidateSelectionJob {
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
span: Arc<JaegerSpan>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let span = jaeger::hash_span(&relay_parent, "candidate-selection:run");
let span = PerLeafSpan::new(span, "candidate-selection");
async move {
let _span = span.child("query-runtime");
let (groups, cores) = futures::try_join!(
+6 -10
View File
@@ -25,8 +25,7 @@ use futures::{
prelude::*,
};
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
jaeger,
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, JaegerSpan,
messages::{
AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage,
@@ -40,7 +39,7 @@ use polkadot_primitives::v1::{
BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
SignedAvailabilityBitfield, ValidatorIndex,
};
use std::{pin::Pin, collections::BTreeMap};
use std::{pin::Pin, collections::BTreeMap, sync::Arc};
use thiserror::Error;
use futures_timer::Delay;
@@ -140,9 +139,10 @@ impl JobTrait for ProvisioningJob {
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
#[tracing::instrument(skip(_run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(span, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
span: Arc<JaegerSpan>,
_run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<ProvisionerMessage>,
@@ -156,11 +156,7 @@ impl JobTrait for ProvisioningJob {
receiver,
);
let span = jaeger::hash_span(&relay_parent, "provisioner");
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop(&span).await
job.run_loop(PerLeafSpan::new(span, "provisioner")).await
}
.boxed()
}
@@ -186,7 +182,7 @@ impl ProvisioningJob {
}
}
async fn run_loop(mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> {
async fn run_loop(mut self, span: PerLeafSpan) -> Result<(), Error> {
use ProvisionerMessage::{
ProvisionableData, RequestBlockAuthorshipData, RequestInherentData,
};
+17
View File
@@ -0,0 +1,17 @@
[package]
name = "polkadot-node-jaeger"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Polkadot Jaeger primitives"
[dependencies]
async-std = "1.8.0"
mick-jaeger = "0.1.2"
lazy_static = "1.4"
parking_lot = "0.11.1"
polkadot-primitives = { path = "../../primitives" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.23"
log = "0.4.11"
@@ -14,7 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Jaeger integration.
//! Polkadot Jaeger related primitives
//!
//! Provides primitives used by Polkadot for interfacing with Jaeger.
//!
//! # Integration
//!
//! See <https://www.jaegertracing.io/> for an introduction.
//!
@@ -39,15 +43,22 @@
//! -p 9411:9411 \
//! docker.io/jaegertracing/all-in-one:1.21
//! ```
//!
use polkadot_node_primitives::SpawnNamed;
use sp_core::traits::SpawnNamed;
use polkadot_primitives::v1::{Hash, PoV, CandidateHash};
use parking_lot::RwLock;
use std::sync::Arc;
use std::result;
pub use crate::errors::JaegerError;
use std::{sync::Arc, result};
/// A description of an error causing the chain API request to be unservable.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum JaegerError {
#[error("Already launched the collector thread")]
AlreadyLaunched,
#[error("Missing jaeger configuration")]
MissingConfiguration,
}
lazy_static::lazy_static! {
static ref INSTANCE: RwLock<Jaeger> = RwLock::new(Jaeger::None);
@@ -102,6 +113,50 @@ impl JaegerConfigBuilder {
}
}
/// A special "per leaf span".
///
/// Essentially this span wraps two spans:
///
/// 1. The span that is created per leaf in the overseer.
/// 2. Some child span of the per-leaf span.
///
/// This just works as auxiliary structure to easily store both.
#[derive(Debug)]
pub struct PerLeafSpan {
leaf_span: Arc<JaegerSpan>,
span: JaegerSpan,
}
impl PerLeafSpan {
/// Creates a new instance.
///
/// Takes the `leaf_span` that is created by the overseer per leaf and a name for a child span.
/// Both will be stored in this object, while the child span is implicitly accessible by using the
/// [`Deref`](std::ops::Deref) implementation.
pub fn new(leaf_span: Arc<JaegerSpan>, name: impl Into<String>) -> Self {
let span = leaf_span.child(name);
Self {
span,
leaf_span,
}
}
/// Returns the leaf span.
pub fn leaf_span(&self) -> &Arc<JaegerSpan> {
&self.leaf_span
}
}
/// Returns a reference to the child span.
impl std::ops::Deref for PerLeafSpan {
type Target = JaegerSpan;
fn deref(&self) -> &JaegerSpan {
&self.span
}
}
/// A wrapper type for a span.
///
/// Handles running with and without jaeger.
@@ -120,6 +175,7 @@ impl JaegerSpan {
Self::Disabled => Self::Disabled,
}
}
/// Add an additional tag to the span.
pub fn add_string_tag(&mut self, tag: &str, value: &str) {
match self {
@@ -32,7 +32,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View, OurView,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
@@ -45,10 +45,8 @@ use polkadot_subsystem::messages::{
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
jaeger,
errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
jaeger, errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
@@ -128,7 +126,7 @@ struct ProtocolState {
peer_views: HashMap<PeerId, View>,
/// Our own view.
view: View,
view: OurView,
/// Caches a mapping of relay parents or ancestor to live candidate hashes.
/// Allows fast intersection of live candidates with views and consecutive unioning.
@@ -278,8 +276,8 @@ impl ProtocolState {
}
}
// Removes all entries from live_under which aren't referenced in the ancestry of
// one of our live relay-chain heads.
/// Removes all entries from live_under which aren't referenced in the ancestry of
/// one of our live relay-chain heads.
fn clean_up_live_under_cache(&mut self) {
let extended_view: HashSet<_> = self.per_relay_parent.iter()
.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(iter::once(*r_hash)))
@@ -353,7 +351,7 @@ async fn handle_our_view_change<Context>(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
state: &mut ProtocolState,
view: View,
view: OurView,
metrics: &Metrics,
) -> Result<()>
where
@@ -845,11 +843,11 @@ where
}
}
// Metadata about a candidate that is part of the live_candidates set.
//
// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This
// information is propagated to the higher level where it can be used to create data entries. Cached candidates
// already have entries associated with them, and thus don't need this metadata to be fetched.
/// Metadata about a candidate that is part of the live_candidates set.
///
/// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This
/// information is propagated to the higher level where it can be used to create data entries. Cached candidates
/// already have entries associated with them, and thus don't need this metadata to be fetched.
#[derive(Debug)]
enum FetchedLiveCandidate {
Cached,
@@ -17,7 +17,7 @@
use super::*;
use assert_matches::assert_matches;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
@@ -398,7 +398,7 @@ async fn expect_chunks_network_message(
async fn change_our_view(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
view: View,
view: OurView,
validator_public: &[ValidatorId],
ancestors: Vec<Hash>,
session_per_relay_parent: HashMap<Hash, SessionIndex>,
@@ -574,7 +574,7 @@ fn check_views() {
let genesis = Hash::repeat_byte(0xAA);
change_our_view(
&mut virtual_overseer,
view![current],
our_view![current],
&validator_public,
vec![ancestors[0], genesis],
hashmap! { current => 1, genesis => 1 },
@@ -641,7 +641,7 @@ fn check_views() {
peer_b_2 => view![ancestors[0]],
},
);
assert_eq!(view, view![current]);
assert_eq!(view, our_view![current]);
}
};
}
@@ -676,7 +676,7 @@ fn reputation_verification() {
change_our_view(
&mut virtual_overseer,
view![current],
our_view![current],
&validator_public,
vec![ancestors[0]],
hashmap! { current => 1 },
@@ -768,7 +768,7 @@ fn not_a_live_candidate_is_detected() {
change_our_view(
&mut virtual_overseer,
view![current],
our_view![current],
&validator_public,
vec![ancestors[0]],
hashmap! { current => 1 },
@@ -816,7 +816,7 @@ fn peer_change_view_before_us() {
change_our_view(
&mut virtual_overseer,
view![current],
our_view![current],
&validator_public,
vec![ancestors[0]],
hashmap! { current => 1 },
@@ -863,7 +863,7 @@ fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() {
change_our_view(
&mut virtual_overseer,
view![ancestors[0]],
our_view![ancestors[0]],
&validator_public,
vec![ancestors[1]],
hashmap! { ancestors[0] => 1 },
@@ -879,7 +879,7 @@ fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() {
change_our_view(
&mut virtual_overseer,
view![current],
our_view![current],
&validator_public,
vec![ancestors[0]],
hashmap! { current => 1 },
@@ -1218,7 +1218,7 @@ fn new_peer_gets_all_chunks_send() {
change_our_view(
&mut virtual_overseer,
view![current],
our_view![current],
&validator_public,
vec![ancestors[0]],
hashmap! { current => 1 },
@@ -27,12 +27,12 @@ use futures::{channel::oneshot, FutureExt};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
jaeger,
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
SubsystemResult,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange, OurView};
use std::collections::{HashMap, HashSet};
const COST_SIGNATURE_INVALID: ReputationChange =
@@ -79,21 +79,21 @@ impl BitfieldGossipMessage {
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone, Debug)]
#[derive(Default, Debug)]
struct ProtocolState {
/// track all active peers and their views
/// to determine what is relevant to them.
peer_views: HashMap<PeerId, View>,
/// Our current view.
view: View,
view: OurView,
/// Additional data particular to a relay parent.
per_relay_parent: HashMap<Hash, PerRelayParentData>,
}
/// Data for a particular relay parent.
#[derive(Debug, Clone, Default)]
#[derive(Debug)]
struct PerRelayParentData {
/// Signing context for a particular relay parent.
signing_context: SigningContext,
@@ -113,9 +113,24 @@ struct PerRelayParentData {
/// Track messages that were already received by a peer
/// to prevent flooding.
message_received_from_peer: HashMap<PeerId, HashSet<ValidatorId>>,
/// The span for this leaf/relay parent.
span: PerLeafSpan,
}
impl PerRelayParentData {
/// Create a new instance.
fn new(signing_context: SigningContext, validator_set: Vec<ValidatorId>, span: PerLeafSpan) -> Self {
Self {
signing_context,
validator_set,
span,
one_per_validator: Default::default(),
message_sent_to_peer: Default::default(),
message_received_from_peer: Default::default(),
}
}
/// Determines if that particular message signed by a validator is needed by the given peer.
fn message_from_validator_needed_by_peer(
&self,
@@ -176,12 +191,13 @@ impl BitfieldDistribution {
// a network message was received
handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await;
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
let _timer = self.metrics.time_active_leaves_update();
for relay_parent in activated {
for (relay_parent, span) in activated {
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated");
let _span = jaeger::hash_span(&relay_parent, "bitfield-dist:active_leaves:basics");
let span = PerLeafSpan::new(span, "bitfield-distribution");
let _span = span.child("query-basics");
// query validator set and signing context per relay_parent once only
match query_basics(&mut ctx, relay_parent).await {
@@ -193,11 +209,7 @@ impl BitfieldDistribution {
// us anything to do with this relay-parent anyway.
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set,
..Default::default()
},
PerRelayParentData::new(signing_context, validator_set, span),
);
}
Err(e) => {
@@ -206,11 +218,6 @@ impl BitfieldDistribution {
_ => {},
}
}
for relay_parent in deactivated {
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "deactivated");
// defer the cleanup to the view change
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
tracing::trace!(target: LOG_TARGET, hash = %hash, number = %number, "block finalized");
@@ -310,7 +317,7 @@ async fn relay_message<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
let span = jaeger::hash_span(&message.relay_parent, "relay-msg");
let span = job_data.span.child("relay-msg");
let _span = span.child("provisionable");
// notify the overseer about a new and valid signed bitfield
@@ -398,6 +405,16 @@ where
return;
};
let mut _span = {
let mut span = job_data.span.child("msg-received");
span.add_string_tag("peer-id", &origin.to_base58());
span.add_string_tag(
"claimed-validator",
&message.signed_availability.validator_index().to_string(),
);
span
};
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
tracing::trace!(
@@ -495,16 +512,6 @@ where
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
let mut _span = {
let mut span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received");
span.add_string_tag("peer-id", &remote.to_base58());
span.add_string_tag(
"claimed-validator",
&format!("{}", bitfield.validator_index()),
);
span
};
tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer");
let gossiped_bitfield = BitfieldGossipMessage {
relay_parent,
@@ -519,7 +526,7 @@ where
/// Handle the changes necassary when our view changes.
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn handle_our_view_change(state: &mut ProtocolState, view: View) {
fn handle_our_view_change(state: &mut ProtocolState, view: OurView) {
let old_view = std::mem::replace(&mut (state.view), view);
for added in state.view.difference(&old_view) {
@@ -603,7 +610,7 @@ where
return;
};
let _span = jaeger::hash_span(&message.relay_parent, "gossip");
let _span = job_data.span.child("gossip");
job_data.message_sent_to_peer
.entry(dest.clone())
@@ -778,7 +785,8 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
use assert_matches::assert_matches;
use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
use polkadot_subsystem::JaegerSpan;
macro_rules! launch {
($fut:expr) => {
@@ -810,18 +818,19 @@ mod test {
},
message_received_from_peer: hashmap!{},
message_sent_to_peer: hashmap!{},
span: PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
},
},
peer_views: peers
.into_iter()
.map(|peer| (peer, view!(relay_parent)))
.collect(),
view: view!(relay_parent),
view: our_view!(relay_parent),
}
}
fn state_with_view(
view: View,
view: OurView,
relay_parent: Hash,
) -> (ProtocolState, SigningContext, SyncCryptoStorePtr, ValidatorId) {
let mut state = ProtocolState::default();
@@ -843,6 +852,7 @@ mod test {
one_per_validator: hashmap!{},
message_received_from_peer: hashmap!{},
message_sent_to_peer: hashmap!{},
span: PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
})
}).collect();
@@ -937,7 +947,7 @@ mod test {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash_a, hash_b], hash_a.clone());
state.peer_views.insert(peer_b.clone(), view![hash_a]);
@@ -995,7 +1005,7 @@ mod test {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash_a, hash_b], hash_a.clone());
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
@@ -1110,7 +1120,7 @@ mod test {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash.clone());
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash.clone());
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
@@ -1206,7 +1216,7 @@ mod test {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash_a, hash_b], hash_a.clone());
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
@@ -1323,7 +1333,7 @@ mod test {
));
// we are not interested in any peers at all anymore
state.view = view![];
state.view = our_view![];
// on rx of the same message, since we are not interested,
// should give penalty
@@ -1365,7 +1375,7 @@ mod test {
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash);
let (mut state, signing_context, keystore, validator) = state_with_view(our_view![hash], hash);
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
+22 -21
View File
@@ -30,7 +30,7 @@ use sc_network::Event as NetworkEvent;
use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult,
SubsystemResult, JaegerSpan,
};
use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
@@ -39,7 +39,7 @@ use polkadot_subsystem::messages::{
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash, BlockNumber};
use polkadot_node_network_protocol::{
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
};
use std::collections::{HashMap, hash_map};
@@ -47,7 +47,6 @@ use std::iter::ExactSizeIterator;
use std::pin::Pin;
use std::sync::Arc;
mod validator_discovery;
/// The maximum amount of heads a peer is allowed to have in their view at any time.
@@ -349,9 +348,9 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Action {
}
}
fn construct_view(live_heads: &[Hash], finalized_number: BlockNumber) -> View {
fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_number: BlockNumber) -> View {
View {
heads: live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect(),
heads: live_heads.rev().take(MAX_VIEW_HEADS).collect(),
finalized_number
}
}
@@ -360,13 +359,13 @@ fn construct_view(live_heads: &[Hash], finalized_number: BlockNumber) -> View {
async fn update_our_view(
net: &mut impl Network,
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
live_heads: &[Hash],
live_heads: &[(Hash, Arc<JaegerSpan>)],
local_view: &mut View,
finalized_number: BlockNumber,
validation_peers: &HashMap<PeerId, PeerData>,
collation_peers: &HashMap<PeerId, PeerData>,
) -> SubsystemResult<()> {
let new_view = construct_view(live_heads, finalized_number);
let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number);
if *local_view == new_view { return Ok(()) }
*local_view = new_view.clone();
@@ -380,18 +379,14 @@ async fn update_our_view(
send_collation_message(
net,
collation_peers.keys().cloned(),
WireMessage::ViewUpdate(new_view.clone()),
WireMessage::ViewUpdate(new_view),
).await?;
dispatch_validation_event_to_all(
NetworkBridgeEvent::OurViewChange(new_view.clone()),
ctx,
).await;
let our_view = OurView::new(live_heads.iter().cloned(), finalized_number);
dispatch_collation_event_to_all(
NetworkBridgeEvent::OurViewChange(new_view.clone()),
ctx,
).await;
dispatch_validation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx).await;
dispatch_collation_event_to_all(NetworkBridgeEvent::OurViewChange(our_view), ctx).await;
Ok(())
}
@@ -584,7 +579,7 @@ where
let mut event_stream = network_service.event_stream().fuse();
// Most recent heads are at the back.
let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS);
let mut live_heads: Vec<(Hash, Arc<JaegerSpan>)> = Vec::with_capacity(MAX_VIEW_HEADS);
let mut local_view = View::default();
let mut finalized_number = 0;
@@ -642,7 +637,7 @@ where
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(h));
live_heads.retain(|h| !deactivated.contains(&h.0));
update_our_view(
&mut network_service,
@@ -999,7 +994,9 @@ mod tests {
let hash_a = Hash::repeat_byte(1);
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a)))
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
))
).await;
let actions = network_handle.next_network_actions(2).await;
@@ -1187,7 +1184,9 @@ mod tests {
let hash_a = Hash::repeat_byte(1);
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a)))
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
))
).await;
let actions = network_handle.next_network_actions(1).await;
@@ -1378,7 +1377,9 @@ mod tests {
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_a, 1))
).await;
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_b)))
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_b, Arc::new(JaegerSpan::Disabled)),
))
).await;
let actions = network_handle.next_network_actions(1).await;
@@ -20,7 +20,6 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
smallvec = "1.5.1"
futures-timer = "3.0.2"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
@@ -24,16 +24,11 @@ use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId,
};
use polkadot_subsystem::{
jaeger,
jaeger, PerLeafSpan,
FromOverseer, OverseerSignal, SubsystemContext,
messages::{
AllMessages, CollatorProtocolMessage,
NetworkBridgeMessage,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, NetworkBridgeEvent, RequestId,
messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage},
};
use polkadot_node_network_protocol::{v1 as protocol_v1, View, PeerId, NetworkBridgeEvent, RequestId, OurView};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
@@ -188,7 +183,10 @@ struct State {
peer_views: HashMap<PeerId, View>,
/// Our own view.
view: View,
view: OurView,
/// Span per relay parent.
span_per_relay_parent: HashMap<Hash, PerLeafSpan>,
/// Possessed collations.
///
@@ -431,7 +429,8 @@ async fn process_msg(
state.collating_on = Some(id);
}
DistributeCollation(receipt, pov) => {
let _span1 = jaeger::hash_span(&receipt.descriptor.relay_parent, "distributing-collation");
let _span1 = state.span_per_relay_parent
.get(&receipt.descriptor.relay_parent).map(|s| s.child("distributing-collation"));
let _span2 = jaeger::pov_span(&pov, "distributing-collation");
match state.collating_on {
Some(id) if receipt.descriptor.para_id != id => {
@@ -542,12 +541,12 @@ async fn handle_incoming_peer_message(
);
}
RequestCollation(request_id, relay_parent, para_id) => {
let _span = jaeger::hash_span(&relay_parent, "rx-collation-request");
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("request-collation"));
match state.collating_on {
Some(our_para_id) => {
if our_para_id == para_id {
if let Some(collation) = state.collations.get(&relay_parent).cloned() {
let _span = _span.child("sending");
let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await;
}
} else {
@@ -665,12 +664,13 @@ async fn handle_network_msg(
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change(
state: &mut State,
view: View,
view: OurView,
) -> Result<()> {
for removed in state.view.difference(&view) {
state.collations.remove(removed);
state.our_validators_groups.remove(removed);
state.connection_requests.remove(removed);
state.span_per_relay_parent.remove(removed);
}
state.view = view;
@@ -725,11 +725,10 @@ pub(crate) async fn run(
mod tests {
use super::*;
use std::time::Duration;
use std::{time::Duration, sync::Arc};
use assert_matches::assert_matches;
use futures::{executor, future, Future, channel::mpsc};
use smallvec::smallvec;
use sp_core::crypto::Pair;
use sp_keyring::Sr25519Keyring;
@@ -739,10 +738,10 @@ mod tests {
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
SessionIndex, SessionInfo,
};
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}};
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::view;
use polkadot_node_network_protocol::{view, our_view};
#[derive(Default)]
struct TestCandidateBuilder {
@@ -888,17 +887,15 @@ mod tests {
self.relay_parent.randomize();
}
let hashes = if merge_views {
vec![old_relay_parent, self.relay_parent]
let our_view = if merge_views {
our_view![old_relay_parent, self.relay_parent]
} else {
vec![self.relay_parent]
our_view![self.relay_parent]
};
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(View { heads: hashes, finalized_number: 0 }),
),
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(our_view)),
).await;
}
}
@@ -997,15 +994,15 @@ mod tests {
overseer_signal(
virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![test_state.relay_parent],
deactivated: smallvec![],
activated: [(test_state.relay_parent, Arc::new(JaegerSpan::Disabled))][..].into(),
deactivated: [][..].into(),
}),
).await;
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent]),
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]),
),
).await;
}
@@ -14,9 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use std::task::Poll;
use std::{collections::{HashMap, HashSet}, time::Duration, task::Poll, sync::Arc};
use futures::{
StreamExt,
@@ -30,20 +28,16 @@ use polkadot_primitives::v1::{
Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV,
};
use polkadot_subsystem::{
jaeger,
jaeger, PerLeafSpan, JaegerSpan,
FromOverseer, OverseerSignal, SubsystemContext,
messages::{
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId,
NetworkBridgeEvent,
};
use polkadot_node_subsystem_util::{
TimeoutExt as _,
metrics::{self, prometheus},
v1 as protocol_v1, View, OurView, PeerId, ReputationChange as Rep, RequestId, NetworkBridgeEvent,
};
use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}};
use super::{modify_reputation, LOG_TARGET, Result};
@@ -172,7 +166,7 @@ struct PerRequest {
#[derive(Default)]
struct State {
/// Our own view.
view: View,
view: OurView,
/// Track all active collators and their views.
peer_views: HashMap<PeerId, View>,
@@ -215,6 +209,9 @@ struct State {
/// Metrics.
metrics: Metrics,
/// Span per relay parent.
span_per_relay_parent: HashMap<Hash, PerLeafSpan>,
}
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
@@ -505,7 +502,7 @@ where
state.peer_views.entry(origin).or_default();
}
AdvertiseCollation(relay_parent, para_id) => {
let _span = jaeger::hash_span(&relay_parent, "advertising-collation");
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation"));
state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent));
if let Some(collator) = state.known_collators.get(&origin) {
@@ -517,7 +514,8 @@ where
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
Collation(request_id, receipt, pov) => {
let _span1 = jaeger::hash_span(&receipt.descriptor.relay_parent, "received-collation");
let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
.map(|s| s.child("received-collation"));
let _span2 = jaeger::pov_span(&pov, "received-collation");
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
@@ -556,9 +554,19 @@ async fn remove_relay_parent(
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change(
state: &mut State,
view: View,
view: OurView,
) -> Result<()> {
let old_view = std::mem::replace(&mut (state.view), view);
let old_view = std::mem::replace(&mut state.view, view);
let added: HashMap<Hash, Arc<JaegerSpan>> = state.view
.span_per_head()
.iter()
.filter(|v| !old_view.contains(&v.0))
.map(|v| (v.0.clone(), v.1.clone()))
.collect();
added.into_iter().for_each(|(h, s)| {
state.span_per_relay_parent.insert(h, PerLeafSpan::new(s, "validator-side"));
});
let removed = old_view
.difference(&state.view)
@@ -571,6 +579,7 @@ async fn handle_our_view_change(
for removed in removed.into_iter() {
state.recently_removed_heads.insert(removed.clone());
remove_relay_parent(state, removed).await?;
state.span_per_relay_parent.remove(&removed);
}
Ok(())
@@ -663,7 +672,7 @@ where
);
}
FetchCollation(relay_parent, collator_id, para_id, tx) => {
let _span = jaeger::hash_span(&relay_parent, "fetching-collation");
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("fetch-collation"));
fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await;
}
ReportCollator(id) => {
@@ -760,7 +769,7 @@ mod tests {
use polkadot_primitives::v1::{BlockData, CollatorPair};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::view;
use polkadot_node_network_protocol::our_view;
#[derive(Clone)]
struct TestState {
@@ -873,7 +882,7 @@ mod tests {
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent])
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent])
)
).await;
@@ -931,7 +940,7 @@ mod tests {
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent])
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent])
)
).await;
@@ -1022,7 +1031,7 @@ mod tests {
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![Hash::repeat_byte(0x42)])
NetworkBridgeEvent::OurViewChange(our_view![Hash::repeat_byte(0x42)])
)
).await;
@@ -1050,7 +1059,7 @@ mod tests {
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent])
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent])
)
).await;
@@ -1134,8 +1143,8 @@ mod tests {
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![test_state.relay_parent])
)
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent])
),
).await;
let peer_b = PeerId::random();
@@ -19,7 +19,6 @@ polkadot-node-network-protocol = { path = "../../network/protocol" }
assert_matches = "1.4.0"
env_logger = "0.8.1"
log = "0.4.11"
smallvec = "1.5.1"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -40,7 +40,7 @@ use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View,
v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, OurView,
};
use futures::prelude::*;
@@ -96,7 +96,7 @@ struct State {
peer_state: HashMap<PeerId, PeerState>,
/// Our own view.
our_view: View,
our_view: OurView,
/// Connect to relevant groups of validators at different relay parents.
connection_requests: validator_discovery::ConnectionRequests,
@@ -152,8 +152,8 @@ async fn handle_signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
let _timer = state.metrics.time_handle_signal();
for relay_parent in activated {
match request_validators_ctx(relay_parent.clone(), ctx).await {
for (relay_parent, _span) in activated {
match request_validators_ctx(relay_parent, ctx).await {
Ok(vals_rx) => {
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
@@ -1,11 +1,26 @@
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use std::time::Duration;
use std::{time::Duration, sync::Arc};
use assert_matches::assert_matches;
use futures::executor;
use tracing::trace;
use smallvec::smallvec;
use sp_keyring::Sr25519Keyring;
@@ -13,10 +28,10 @@ use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId,
ScheduledCore, ValidatorIndex, SessionIndex, SessionInfo,
};
use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan};
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_network_protocol::view;
use polkadot_node_network_protocol::{view, our_view};
fn make_pov(data: Vec<u8>) -> PoV {
PoV { block_data: BlockData(data) }
@@ -261,8 +276,8 @@ fn ask_validators_for_povs() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![test_state.relay_parent.clone()],
deactivated: smallvec![],
activated: [(test_state.relay_parent, Arc::new(JaegerSpan::Disabled))][..].into(),
deactivated: [][..].into(),
}),
).await;
@@ -429,8 +444,8 @@ fn ask_validators_for_povs() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![next_leaf.clone()],
deactivated: smallvec![current.clone()],
activated: [(next_leaf, Arc::new(JaegerSpan::Disabled))][..].into(),
deactivated: [current.clone()][..].into(),
})
).await;
@@ -583,7 +598,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
s
},
our_view: view![hash_a, hash_b],
our_view: our_view![hash_a, hash_b],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -666,7 +681,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -840,7 +855,7 @@ fn peer_view_change_leads_to_us_informing() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -913,7 +928,7 @@ fn peer_complete_fetch_and_is_rewarded() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1003,7 +1018,7 @@ fn peer_punished_for_sending_bad_pov() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1068,7 +1083,7 @@ fn peer_punished_for_sending_unexpected_pov() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1131,7 +1146,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1191,7 +1206,7 @@ fn peer_reported_for_awaiting_too_much() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1278,7 +1293,7 @@ fn peer_reported_for_awaiting_outside_their_view() {
s
},
our_view: view![hash_a, hash_b],
our_view: our_view![hash_a, hash_b],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1342,7 +1357,7 @@ fn peer_reported_for_awaiting_outside_our_view() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1421,7 +1436,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -1505,7 +1520,7 @@ fn peer_completing_request_no_longer_awaiting() {
s
},
our_view: view![hash_a],
our_view: our_view![hash_a],
metrics: Default::default(),
connection_requests: Default::default(),
};
@@ -8,5 +8,6 @@ description = "Primitives types for the Node-side"
[dependencies]
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
+80 -9
View File
@@ -21,10 +21,13 @@
use polkadot_primitives::v1::{Hash, BlockNumber};
use parity_scale_codec::{Encode, Decode};
use std::convert::TryFrom;
use std::fmt;
use std::{convert::TryFrom, fmt, collections::HashMap};
pub use sc_network::{ReputationChange, PeerId};
#[doc(hidden)]
pub use polkadot_node_jaeger::JaegerSpan;
#[doc(hidden)]
pub use std::sync::Arc;
/// A unique identifier of a request.
pub type RequestId = u64;
@@ -44,7 +47,6 @@ impl fmt::Display for WrongVariant {
impl std::error::Error for WrongVariant {}
/// The peer-sets that the network manages. Different subsystems will use different peer-sets.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum PeerSet {
@@ -103,8 +105,8 @@ pub enum NetworkBridgeEvent<M> {
/// Peer's `View` has changed.
PeerViewChange(PeerId, View),
/// Our `View` has changed.
OurViewChange(View),
/// Our view has changed.
OurViewChange(OurView),
}
macro_rules! impl_try_from {
@@ -159,6 +161,72 @@ impl<M> NetworkBridgeEvent<M> {
}
}
/// Specialized wrapper around [`View`].
///
/// Besides the access to the view itself, it also gives access to the [`JaegerSpan`] per leave/head.
#[derive(Debug, Clone, Default)]
pub struct OurView {
view: View,
span_per_head: HashMap<Hash, Arc<JaegerSpan>>,
}
impl OurView {
/// Creates a new instance.
pub fn new(heads: impl IntoIterator<Item = (Hash, Arc<JaegerSpan>)>, finalized_number: BlockNumber) -> Self {
let state_per_head = heads.into_iter().collect::<HashMap<_, _>>();
Self {
view: View {
heads: state_per_head.keys().cloned().collect(),
finalized_number,
},
span_per_head: state_per_head,
}
}
/// Returns the span per head map.
///
/// For each head there exists one span in this map.
pub fn span_per_head(&self) -> &HashMap<Hash, Arc<JaegerSpan>> {
&self.span_per_head
}
}
impl PartialEq for OurView {
fn eq(&self, other: &Self) -> bool {
self.view == other.view
}
}
impl std::ops::Deref for OurView {
type Target = View;
fn deref(&self) -> &View {
&self.view
}
}
/// Construct a new [`OurView`] with the given chain heads, finalized number 0 and disabled [`JaegerSpan`]'s.
///
/// NOTE: Use for tests only.
///
/// # Example
///
/// ```
/// # use polkadot_node_network_protocol::our_view;
/// # use polkadot_primitives::v1::Hash;
/// let our_view = our_view![Hash::repeat_byte(1), Hash::repeat_byte(2)];
/// ```
#[macro_export]
macro_rules! our_view {
( $( $hash:expr ),* $(,)? ) => {
$crate::OurView::new(
vec![ $( $hash.clone() ),* ].into_iter().map(|h| (h, $crate::Arc::new($crate::JaegerSpan::Disabled))),
0,
)
};
}
/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads
/// and the highest known finalized block number.
///
@@ -171,18 +239,21 @@ pub struct View {
pub finalized_number: BlockNumber,
}
/// Construct a new view with the given chain heads and finalized number 0.
///
/// NOTE: Use for tests only.
///
/// # Example
///
/// ```ignore
/// view![Hash::repeat_byte(1), Hash::repeat_byte(2)]
/// ```
/// # use polkadot_node_network_protocol::view;
/// # use polkadot_primitives::v1::Hash;
/// let view = view![Hash::repeat_byte(1), Hash::repeat_byte(2)];
/// ```
#[macro_export]
macro_rules! view {
( $( $hash:expr ),* $(,)? ) => {
View { heads: vec![ $( $hash.clone() ),* ], finalized_number: 0 }
$crate::View { heads: vec![ $( $hash.clone() ),* ], finalized_number: 0 }
};
}
@@ -23,9 +23,8 @@
#![warn(missing_docs)]
use polkadot_subsystem::{
jaeger,
Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
messages::{
AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage,
RuntimeApiMessage, RuntimeApiRequest,
@@ -37,7 +36,7 @@ use polkadot_primitives::v1::{
Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, CandidateHash,
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent,
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent, OurView,
};
use futures::prelude::*;
@@ -390,14 +389,14 @@ struct ActiveHeadData {
/// How many `Seconded` statements we've seen per validator.
seconded_counts: HashMap<ValidatorIndex, usize>,
/// A Jaeger span for this head, so we can attach data to it.
span: jaeger::JaegerSpan,
span: PerLeafSpan,
}
impl ActiveHeadData {
fn new(
validators: Vec<ValidatorId>,
session_index: sp_staking::SessionIndex,
relay_parent: &Hash,
span: PerLeafSpan,
) -> Self {
ActiveHeadData {
candidates: Default::default(),
@@ -405,7 +404,7 @@ impl ActiveHeadData {
validators,
session_index,
seconded_counts: Default::default(),
span: jaeger::hash_span(&relay_parent, "statement-dist-active"),
span,
}
}
@@ -839,7 +838,7 @@ async fn handle_network_update(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
our_view: &mut View,
our_view: &mut OurView,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
@@ -930,7 +929,7 @@ impl StatementDistribution {
mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>,
) -> SubsystemResult<()> {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = View::default();
let mut our_view = OurView::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
let mut statement_listeners = StatementListeners::new();
let metrics = self.metrics;
@@ -941,7 +940,9 @@ impl StatementDistribution {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. })) => {
let _timer = metrics.time_active_leaves_update();
for relay_parent in activated {
for (relay_parent, span) in activated {
let span = PerLeafSpan::new(span, "statement-distribution");
let (validators, session_index) = {
let (val_tx, val_rx) = oneshot::channel();
let (session_tx, session_rx) = oneshot::channel();
@@ -981,7 +982,7 @@ impl StatementDistribution {
};
active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index, &relay_parent));
.or_insert(ActiveHeadData::new(validators, session_index, span));
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {
@@ -1117,7 +1118,8 @@ mod tests {
use futures::executor::{self, block_on};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore;
use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
use polkadot_subsystem::JaegerSpan;
#[test]
fn active_head_accepts_only_2_seconded_per_validator() {
@@ -1155,7 +1157,11 @@ mod tests {
c
};
let mut head_data = ActiveHeadData::new(validators, session_index, &parent_hash);
let mut head_data = ActiveHeadData::new(
validators,
session_index,
PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
);
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new(
@@ -1413,7 +1419,11 @@ mod tests {
).unwrap();
let new_head_data = {
let mut data = ActiveHeadData::new(validators, session_index, &hash_c);
let mut data = ActiveHeadData::new(
validators,
session_index,
PerLeafSpan::new(Arc::new(JaegerSpan::Disabled), "test"),
);
let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
@@ -1665,7 +1675,7 @@ mod tests {
let test_fut = async move {
// register our active heads.
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![hash_a].into(),
activated: vec![(hash_a, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}))).await;
@@ -1718,7 +1728,7 @@ mod tests {
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![hash_a])
NetworkBridgeEvent::OurViewChange(our_view![hash_a])
)
}).await;
+37 -32
View File
@@ -88,12 +88,11 @@ use polkadot_subsystem::messages::{
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem,
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, JaegerSpan, jaeger,
};
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}};
use polkadot_node_primitives::SpawnNamed;
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
// A graceful `Overseer` teardown time delay.
@@ -490,6 +489,9 @@ pub struct Overseer<S> {
/// External listeners waiting for a hash to be in the active-leave set.
activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
/// Stores the [`JaegerSpan`] per active leaf.
span_per_active_leaf: HashMap<Hash, Arc<JaegerSpan>>,
/// A set of leaves that `Overseer` starts working with.
///
/// Drained at the beginning of `run` and never used again.
@@ -1277,6 +1279,7 @@ where
leaves,
active_leaves,
metrics,
span_per_active_leaf: Default::default(),
};
Ok((this, handler))
@@ -1321,9 +1324,9 @@ where
let mut update = ActiveLeavesUpdate::default();
for (hash, number) in std::mem::take(&mut self.leaves) {
update.activated.push(hash);
let _ = self.active_leaves.insert(hash, number);
self.on_head_activated(&hash);
let span = self.on_head_activated(&hash);
update.activated.push((hash, span));
}
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
@@ -1390,32 +1393,26 @@ where
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => entry.insert(block.number),
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
return Ok(());
}
};
let span = self.on_head_activated(&block.hash);
let mut update = ActiveLeavesUpdate::start_work(block.hash, span);
if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
if let Some(expected_parent_number) = block.number.checked_sub(1) {
debug_assert_eq!(expected_parent_number, number);
}
debug_assert_eq!(block.number.saturating_sub(1), number);
update.deactivated.push(block.parent_hash);
self.on_head_deactivated(&block.parent_hash);
}
match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => {
update.activated.push(block.hash);
let _ = entry.insert(block.number);
self.on_head_activated(&block.hash);
},
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
}
}
self.clean_up_external_listeners();
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Ok(())
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
@@ -1519,7 +1516,7 @@ where
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn on_head_activated(&mut self, hash: &Hash) {
fn on_head_activated(&mut self, hash: &Hash) -> Arc<JaegerSpan> {
self.metrics.on_head_activated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
for listener in listeners {
@@ -1527,15 +1524,17 @@ where
let _ = listener.send(Ok(()));
}
}
let span = Arc::new(jaeger::hash_span(hash, "leave activated"));
self.span_per_active_leaf.insert(*hash, span.clone());
span
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn on_head_deactivated(&mut self, hash: &Hash) {
self.metrics.on_head_deactivated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
// clean up and signal to listeners the block is deactivated
drop(listeners);
}
self.activation_external_listeners.remove(hash);
self.span_per_active_leaf.remove(hash);
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
@@ -1615,7 +1614,7 @@ mod tests {
use futures::{executor, pin_mut, select, channel::mpsc, FutureExt, pending};
use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash};
use polkadot_subsystem::messages::RuntimeApiRequest;
use polkadot_subsystem::{messages::RuntimeApiRequest, JaegerSpan};
use polkadot_node_primitives::{Collation, CollationGenerationConfig};
use polkadot_node_network_protocol::{PeerId, ReputationChange, NetworkBridgeEvent};
@@ -1980,13 +1979,16 @@ mod tests {
handler.block_imported(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(first_block_hash)),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
first_block_hash,
Arc::new(JaegerSpan::Disabled),
)),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [second_block_hash].as_ref().into(),
activated: [(second_block_hash, Arc::new(JaegerSpan::Disabled))].as_ref().into(),
deactivated: [first_block_hash].as_ref().into(),
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [third_block_hash].as_ref().into(),
activated: [(third_block_hash, Arc::new(JaegerSpan::Disabled))].as_ref().into(),
deactivated: [second_block_hash].as_ref().into(),
}),
];
@@ -2074,7 +2076,10 @@ mod tests {
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [first_block_hash, second_block_hash].as_ref().into(),
activated: [
(first_block_hash, Arc::new(JaegerSpan::Disabled)),
(second_block_hash, Arc::new(JaegerSpan::Disabled)),
].as_ref().into(),
..Default::default()
}),
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+1
View File
@@ -19,6 +19,7 @@ tracing-futures = "0.2.4"
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
polkadot-node-jaeger = { path = "../jaeger" }
polkadot-primitives = { path = "../../primitives" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
+22 -26
View File
@@ -29,6 +29,7 @@ use polkadot_node_subsystem::{
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender, BoundToRelayParent},
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
};
use polkadot_node_jaeger::JaegerSpan;
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
use futures_timer::Delay;
use parity_scale_codec::Encode;
@@ -36,27 +37,14 @@ use pin_project::pin_project;
use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
SessionInfo,
};
use sp_core::{
traits::SpawnNamed,
Public
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo,
};
use sp_core::{traits::SpawnNamed, Public};
use sp_application_crypto::AppKey;
use sp_keystore::{
CryptoStore,
SyncCryptoStorePtr,
Error as KeystoreError,
};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError};
use std::{
collections::{HashMap, hash_map::Entry},
convert::{TryFrom, TryInto},
marker::Unpin,
pin::Pin,
task::{Poll, Context},
time::Duration,
fmt,
collections::{HashMap, hash_map::Entry}, convert::{TryFrom, TryInto}, marker::Unpin, pin::Pin, task::{Poll, Context},
time::Duration, fmt, sync::Arc,
};
use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error;
@@ -494,6 +482,7 @@ pub trait JobTrait: Unpin {
/// The job should be ended when `receiver` returns `None`.
fn run(
parent: Hash,
span: Arc<JaegerSpan>,
run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<Self::ToJob>,
@@ -561,14 +550,20 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
}
/// Spawn a new job for this `parent_hash`, with whatever args are appropriate.
fn spawn_job(&mut self, parent_hash: Hash, run_args: Job::RunArgs, metrics: Job::Metrics) -> Result<(), Error> {
fn spawn_job(
&mut self,
parent_hash: Hash,
span: Arc<JaegerSpan>,
run_args: Job::RunArgs,
metrics: Job::Metrics,
) -> Result<(), Error> {
let (to_job_tx, to_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let (from_job_tx, from_job_rx) = mpsc::channel(JOB_CHANNEL_CAPACITY);
let err_tx = self.errors.clone();
let (future, abort_handle) = future::abortable(async move {
if let Err(e) = Job::run(parent_hash, run_args, metrics, to_job_rx, from_job_tx).await {
if let Err(e) = Job::run(parent_hash, span, run_args, metrics, to_job_rx, from_job_tx).await {
tracing::error!(
job = Job::NAME,
parent_hash = %parent_hash,
@@ -782,9 +777,9 @@ where
activated,
deactivated,
}))) => {
for hash in activated {
for (hash, span) in activated {
let metrics = metrics.clone();
if let Err(e) = jobs.spawn_job(hash, run_args.clone(), metrics) {
if let Err(e) = jobs.spawn_job(hash, span, run_args.clone(), metrics) {
tracing::error!(
job = Job::NAME,
err = ?e,
@@ -998,13 +993,13 @@ mod tests {
use thiserror::Error;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateSelectionMessage}, ActiveLeavesUpdate, FromOverseer, OverseerSignal,
SpawnedSubsystem,
SpawnedSubsystem, JaegerSpan,
};
use assert_matches::assert_matches;
use futures::{channel::mpsc, executor, StreamExt, future, Future, FutureExt, SinkExt};
use polkadot_primitives::v1::Hash;
use polkadot_node_subsystem_test_helpers::{self as test_helpers, make_subsystem_context};
use std::{pin::Pin, time::Duration};
use std::{pin::Pin, time::Duration, sync::Arc};
// basic usage: in a nutshell, when you want to define a subsystem, just focus on what its jobs do;
// you can leave the subsystem itself to the job manager.
@@ -1040,6 +1035,7 @@ mod tests {
// this function is in charge of creating and executing the job's main loop
fn run(
_: Hash,
_: Arc<JaegerSpan>,
run_args: Self::RunArgs,
_metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
@@ -1123,7 +1119,7 @@ mod tests {
test_harness(true, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent),
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(JaegerSpan::Disabled)),
)))
.await;
assert_matches!(
@@ -1152,7 +1148,7 @@ mod tests {
test_harness(true, |mut overseer_handle, err_rx| async move {
overseer_handle
.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(relay_parent),
ActiveLeavesUpdate::start_work(relay_parent, Arc::new(JaegerSpan::Disabled)),
)))
.await;
+1
View File
@@ -22,6 +22,7 @@ polkadot-node-primitives = { path = "../primitives" }
polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
polkadot-node-jaeger = { path = "../jaeger" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
smallvec = "1.5.1"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
-12
View File
@@ -59,15 +59,3 @@ impl core::fmt::Display for ChainApiError {
}
impl std::error::Error for ChainApiError {}
/// A description of an error causing the chain API request to be unservable.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum JaegerError {
#[error("Already launched the collector thread")]
AlreadyLaunched,
#[error("Missing jaeger configuration")]
MissingConfiguration,
}
+30 -10
View File
@@ -22,7 +22,7 @@
#![warn(missing_docs)]
use std::pin::Pin;
use std::{pin::Pin, sync::Arc, fmt};
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
@@ -36,8 +36,9 @@ use crate::messages::AllMessages;
pub mod errors;
pub mod messages;
pub mod jaeger;
pub use crate::jaeger::*;
pub use polkadot_node_jaeger as jaeger;
pub use jaeger::*;
/// How many slots are stack-reserved for active leaves updates
///
@@ -48,18 +49,21 @@ const ACTIVE_LEAVES_SMALLVEC_CAPACITY: usize = 8;
/// Changes in the set of active leaves: the parachain heads which we care to work on.
///
/// Note that the activated and deactivated fields indicate deltas, not complete sets.
#[derive(Clone, Debug, Default, Eq)]
#[derive(Clone, Default)]
pub struct ActiveLeavesUpdate {
/// New relay chain block hashes of interest.
pub activated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
/// New relay chain block hashes of interest and their associated [`JaegerSpan`].
///
/// NOTE: Each span should only be kept active as long as the leaf is considered active and should be dropped
/// when the leaf is deactivated.
pub activated: SmallVec<[(Hash, Arc<JaegerSpan>); ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
/// Relay chain block hashes no longer of interest.
pub deactivated: SmallVec<[Hash; ACTIVE_LEAVES_SMALLVEC_CAPACITY]>,
}
impl ActiveLeavesUpdate {
/// Create a ActiveLeavesUpdate with a single activated hash
pub fn start_work(hash: Hash) -> Self {
Self { activated: [hash][..].into(), ..Default::default() }
pub fn start_work(hash: Hash, span: Arc<JaegerSpan>) -> Self {
Self { activated: [(hash, span)][..].into(), ..Default::default() }
}
/// Create a ActiveLeavesUpdate with a single deactivated hash
@@ -79,11 +83,27 @@ impl PartialEq for ActiveLeavesUpdate {
/// Instead, it means equality when `activated` and `deactivated` are considered as sets.
fn eq(&self, other: &Self) -> bool {
self.activated.len() == other.activated.len() && self.deactivated.len() == other.deactivated.len()
&& self.activated.iter().all(|a| other.activated.contains(a))
&& self.activated.iter().all(|a| other.activated.iter().any(|o| a.0 == o.0))
&& self.deactivated.iter().all(|a| other.deactivated.contains(a))
}
}
impl fmt::Debug for ActiveLeavesUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
struct Activated<'a>(&'a [(Hash, Arc<JaegerSpan>)]);
impl fmt::Debug for Activated<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.0.iter().map(|e| e.0)).finish()
}
}
f.debug_struct("ActiveLeavesUpdate")
.field("activated", &Activated(&self.activated))
.field("deactivated", &self.deactivated)
.finish()
}
}
/// Signals sent by an overseer to a subsystem.
#[derive(PartialEq, Clone, Debug)]
pub enum OverseerSignal {
@@ -139,7 +159,7 @@ pub enum SubsystemError {
Prometheus(#[from] substrate_prometheus_endpoint::PrometheusError),
#[error(transparent)]
Jaeger(#[from] errors::JaegerError),
Jaeger(#[from] JaegerError),
#[error("Failed to {0}")]
Context(String),
+1 -1
View File
@@ -105,7 +105,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
spec_name: create_runtime_str!("rococo"),
impl_name: create_runtime_str!("parity-rococo-v1"),
authoring_version: 0,
spec_version: 13,
spec_version: 14,
impl_version: 0,
#[cfg(not(feature = "disable-runtime-api"))]
apis: RUNTIME_API_VERSIONS,