Add tracing support to node (#1940)

* drop in tracing to replace log

* add structured logging to trace messages

* add structured logging to debug messages

* add structured logging to info messages

* add structured logging to warn messages

* add structured logging to error messages

* normalize spacing and Display vs Debug

* add instrumentation to the various 'fn run'

* use explicit tracing module throughout

* fix availability distribution test

* don't double-print errors

* remove further redundancy from logs

* fix test errors

* fix more test errors

* remove unused kv_log_macro

* fix unused variable

* add tracing spans to collation generation

* add tracing spans to av-store

* add tracing spans to backing

* add tracing spans to bitfield-signing

* add tracing spans to candidate-selection

* add tracing spans to candidate-validation

* add tracing spans to chain-api

* add tracing spans to provisioner

* add tracing spans to runtime-api

* add tracing spans to availability-distribution

* add tracing spans to bitfield-distribution

* add tracing spans to network-bridge

* add tracing spans to collator-protocol

* add tracing spans to pov-distribution

* add tracing spans to statement-distribution

* add tracing spans to overseer

* cleanup
This commit is contained in:
Peter Goodspeed-Niklaus
2020-11-20 12:02:04 +01:00
committed by GitHub
parent 94670d8082
commit e49989971d
53 changed files with 564 additions and 280 deletions
+44 -15
View File
@@ -4942,6 +4942,8 @@ dependencies = [
"sp-core",
"sp-keystore",
"tempfile",
"tracing",
"tracing-futures",
]
[[package]]
@@ -4967,6 +4969,8 @@ dependencies = [
"sp-keyring",
"sp-keystore",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -4986,6 +4990,7 @@ dependencies = [
"substrate-browser-utils",
"substrate-build-script-utils",
"thiserror",
"tracing-futures",
"wasm-bindgen",
"wasm-bindgen-futures",
]
@@ -5008,6 +5013,8 @@ dependencies = [
"sp-core",
"sp-keyring",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5039,7 +5046,6 @@ dependencies = [
"assert_matches",
"async-trait",
"futures 0.3.8",
"log",
"parity-scale-codec",
"parking_lot 0.11.1",
"polkadot-node-network-protocol",
@@ -5050,6 +5056,8 @@ dependencies = [
"sc-network",
"sp-core",
"sp-keyring",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5057,7 +5065,6 @@ name = "polkadot-node-collation-generation"
version = "0.1.0"
dependencies = [
"futures 0.3.8",
"log",
"polkadot-erasure-coding",
"polkadot-node-primitives",
"polkadot-node-subsystem",
@@ -5066,6 +5073,8 @@ dependencies = [
"polkadot-primitives",
"sp-core",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5091,6 +5100,8 @@ dependencies = [
"smallvec 1.5.0",
"sp-core",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5100,7 +5111,6 @@ dependencies = [
"assert_matches",
"bitvec",
"futures 0.3.8",
"log",
"polkadot-erasure-coding",
"polkadot-node-primitives",
"polkadot-node-subsystem",
@@ -5114,6 +5124,8 @@ dependencies = [
"sp-keyring",
"sp-keystore",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5122,12 +5134,13 @@ version = "0.1.0"
dependencies = [
"derive_more",
"futures 0.3.8",
"log",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-keystore",
"thiserror",
"tracing",
"tracing-futures",
"wasm-timer",
]
@@ -5136,12 +5149,13 @@ name = "polkadot-node-core-candidate-selection"
version = "0.1.0"
dependencies = [
"futures 0.3.8",
"log",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-core",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5150,7 +5164,6 @@ version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.8",
"log",
"parity-scale-codec",
"polkadot-node-primitives",
"polkadot-node-subsystem",
@@ -5160,6 +5173,8 @@ dependencies = [
"polkadot-primitives",
"sp-core",
"sp-keyring",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5174,6 +5189,8 @@ dependencies = [
"polkadot-primitives",
"sp-blockchain",
"sp-core",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5182,7 +5199,6 @@ version = "0.1.0"
dependencies = [
"futures 0.3.8",
"futures-timer 3.0.2",
"log",
"polkadot-node-subsystem",
"polkadot-overseer",
"polkadot-primitives",
@@ -5197,6 +5213,7 @@ dependencies = [
"sp-runtime",
"sp-transaction-pool",
"substrate-prometheus-endpoint",
"tracing",
]
[[package]]
@@ -5206,13 +5223,14 @@ dependencies = [
"bitvec",
"futures 0.3.8",
"futures-timer 3.0.2",
"log",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-application-crypto",
"sp-keystore",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5226,6 +5244,8 @@ dependencies = [
"polkadot-primitives",
"sp-api",
"sp-core",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5259,7 +5279,6 @@ dependencies = [
"derive_more",
"futures 0.3.8",
"futures-timer 3.0.2",
"log",
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.2",
@@ -5273,6 +5292,8 @@ dependencies = [
"sp-core",
"substrate-prometheus-endpoint",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5282,7 +5303,6 @@ dependencies = [
"async-trait",
"futures 0.3.8",
"futures-timer 3.0.2",
"log",
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.2",
@@ -5295,6 +5315,8 @@ dependencies = [
"sc-network",
"smallvec 1.5.0",
"sp-core",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5321,6 +5343,8 @@ dependencies = [
"streamunordered",
"substrate-prometheus-endpoint",
"thiserror",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5332,7 +5356,6 @@ dependencies = [
"futures 0.3.8",
"futures-timer 3.0.2",
"kv-log-macro",
"log",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
@@ -5341,6 +5364,8 @@ dependencies = [
"sc-client-api",
"sp-core",
"streamunordered",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5371,13 +5396,14 @@ version = "0.1.0"
dependencies = [
"assert_matches",
"futures 0.3.8",
"log",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-core",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5613,7 +5639,6 @@ dependencies = [
"hex-literal",
"kusama-runtime",
"lazy_static",
"log",
"pallet-babe",
"pallet-im-online",
"pallet-staking",
@@ -5679,6 +5704,8 @@ dependencies = [
"sp-transaction-pool",
"sp-trie",
"substrate-prometheus-endpoint",
"tracing",
"tracing-futures",
"westend-runtime",
]
@@ -5690,7 +5717,6 @@ dependencies = [
"assert_matches",
"futures 0.3.8",
"indexmap",
"log",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
@@ -5703,6 +5729,8 @@ dependencies = [
"sp-keyring",
"sp-keystore",
"sp-staking",
"tracing",
"tracing-futures",
]
[[package]]
@@ -5806,7 +5834,6 @@ dependencies = [
"futures 0.1.30",
"futures 0.3.8",
"hex",
"log",
"pallet-balances",
"pallet-staking",
"pallet-transaction-payment",
@@ -5848,6 +5875,8 @@ dependencies = [
"substrate-test-utils",
"tempfile",
"tokio 0.2.22",
"tracing",
"tracing-futures",
]
[[package]]
+1
View File
@@ -25,6 +25,7 @@ polkadot-parachain = { path = "../parachain", optional = true }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
tracing-futures = "0.2.4"
frame-benchmarking-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
+7
View File
@@ -59,6 +59,13 @@ pub type Hash = sp_core::H256;
#[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, Debug, Default)]
pub struct CandidateHash(pub Hash);
#[cfg(feature="std")]
impl std::fmt::Display for CandidateHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// Index of a transaction in the relay chain. 32-bit should be plenty.
pub type Nonce = u32;
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
polkadot-erasure-coding = { path = "../../erasure-coding" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
+20 -15
View File
@@ -74,6 +74,7 @@ impl CollationGenerationSubsystem {
///
/// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
/// Otherwise, most are logged and then discarded.
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run<Context>(mut self, mut ctx: Context)
where
Context: SubsystemContext<Message = CollationGenerationMessage>,
@@ -95,7 +96,7 @@ impl CollationGenerationSubsystem {
msg = receiver.next().fuse() => {
if let Some(msg) = msg {
if let Err(err) = ctx.send_message(msg).await {
log::warn!(target: LOG_TARGET, "failed to forward message to overseer: {:?}", err);
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to forward message to overseer");
break;
}
}
@@ -108,6 +109,7 @@ impl CollationGenerationSubsystem {
// note: this doesn't strictly need to be a separate function; it's more an administrative function
// so that we don't clutter the run loop. It could in principle be inlined directly into there.
// it should hopefully therefore be ok that it's an async function mutably borrowing self.
#[tracing::instrument(level = "trace", skip(self, ctx, sender), fields(subsystem = LOG_TARGET))]
async fn handle_incoming<Context>(
&mut self,
incoming: SubsystemResult<FromOverseer<Context::Message>>,
@@ -129,7 +131,7 @@ impl CollationGenerationSubsystem {
if let Err(err) =
handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await
{
log::warn!(target: LOG_TARGET, "failed to handle new activations: {}", err);
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activations");
};
}
false
@@ -139,7 +141,7 @@ impl CollationGenerationSubsystem {
msg: CollationGenerationMessage::Initialize(config),
}) => {
if self.config.is_some() {
log::error!(target: LOG_TARGET, "double initialization");
tracing::error!(target: LOG_TARGET, "double initialization");
} else {
self.config = Some(Arc::new(config));
}
@@ -147,8 +149,9 @@ impl CollationGenerationSubsystem {
}
Ok(Signal(BlockFinalized(_))) => false,
Err(err) => {
log::error!(
tracing::error!(
target: LOG_TARGET,
err = ?err,
"error receiving message from subsystem context: {:?}",
err
);
@@ -175,6 +178,7 @@ where
}
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, sender), fields(subsystem = LOG_TARGET))]
async fn handle_new_activations<Context: SubsystemContext>(
config: Arc<CollationGenerationConfig>,
activated: &[Hash],
@@ -237,10 +241,10 @@ async fn handle_new_activations<Context: SubsystemContext>(
let collation = match (task_config.collator)(relay_parent, &validation_data).await {
Some(collation) => collation,
None => {
log::debug!(
tracing::debug!(
target: LOG_TARGET,
"collator returned no collation on collate for para_id {}.",
scheduled_core.para_id,
para_id = %scheduled_core.para_id,
"collator returned no collation on collate",
);
return
}
@@ -262,11 +266,11 @@ async fn handle_new_activations<Context: SubsystemContext>(
) {
Ok(erasure_root) => erasure_root,
Err(err) => {
log::error!(
tracing::error!(
target: LOG_TARGET,
"failed to calculate erasure root for para_id {}: {:?}",
scheduled_core.para_id,
err
para_id = %scheduled_core.para_id,
err = ?err,
"failed to calculate erasure root",
);
return
}
@@ -299,11 +303,11 @@ async fn handle_new_activations<Context: SubsystemContext>(
if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
)).await {
log::warn!(
tracing::warn!(
target: LOG_TARGET,
"failed to send collation result for para_id {}: {:?}",
scheduled_core.para_id,
err
para_id = %scheduled_core.para_id,
err = ?err,
"failed to send collation result",
);
}
})).await?;
@@ -313,6 +317,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
Ok(())
}
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn erasure_root(
n_validators: usize,
persisted_validation: PersistedValidationData,
+3 -1
View File
@@ -9,8 +9,9 @@ futures = "0.3.8"
futures-timer = "3.0.2"
kvdb = "0.7.0"
kvdb-rocksdb = "0.9.1"
log = "0.4.11"
thiserror = "1.0.22"
tracing = "0.1.21"
tracing-futures = "0.2.4"
parity-scale-codec = { version = "1.3.5", features = ["derive"] }
erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
@@ -22,6 +23,7 @@ polkadot-primitives = { path = "../../../primitives" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
[dev-dependencies]
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
smallvec = "1.5.0"
+39 -19
View File
@@ -73,13 +73,13 @@ enum Error {
}
impl Error {
fn severity(&self) -> log::Level {
fn trace(&self) {
match self {
// don't spam the log with spurious errors
Self::RuntimeApi(_) |
Self::Oneshot(_) => log::Level::Debug,
Self::Oneshot(_) => tracing::debug!(target: LOG_TARGET, err = ?self),
// it's worth reporting otherwise
_ => log::Level::Warn,
_ => tracing::warn!(target: LOG_TARGET, err = ?self),
}
}
}
@@ -311,18 +311,19 @@ pub struct AvailabilityStoreSubsystem {
impl AvailabilityStoreSubsystem {
// Perform pruning of PoVs
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_povs(&self) -> Result<(), Error> {
let mut tx = DBTransaction::new();
let mut pov_pruning = pov_pruning(&self.inner).unwrap_or_default();
let now = PruningDelay::now()?;
log::trace!(target: LOG_TARGET, "Pruning PoVs");
tracing::trace!(target: LOG_TARGET, "Pruning PoVs");
let outdated_records_count = pov_pruning.iter()
.take_while(|r| r.prune_at <= now)
.count();
for record in pov_pruning.drain(..outdated_records_count) {
log::trace!(target: LOG_TARGET, "Removing record {:?}", record);
tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
tx.delete(
columns::DATA,
available_data_key(&record.candidate_hash).as_slice(),
@@ -335,18 +336,19 @@ impl AvailabilityStoreSubsystem {
}
// Perform pruning of chunks.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn prune_chunks(&self) -> Result<(), Error> {
let mut tx = DBTransaction::new();
let mut chunk_pruning = chunk_pruning(&self.inner).unwrap_or_default();
let now = PruningDelay::now()?;
log::trace!(target: LOG_TARGET, "Pruning Chunks");
tracing::trace!(target: LOG_TARGET, "Pruning Chunks");
let outdated_records_count = chunk_pruning.iter()
.take_while(|r| r.prune_at <= now)
.count();
for record in chunk_pruning.drain(..outdated_records_count) {
log::trace!(target: LOG_TARGET, "Removing record {:?}", record);
tracing::trace!(target: LOG_TARGET, record = ?record, "Removing record");
tx.delete(
columns::DATA,
erasure_chunk_key(&record.candidate_hash, record.chunk_index).as_slice(),
@@ -361,6 +363,7 @@ impl AvailabilityStoreSubsystem {
// Return a `Future` that either resolves when another PoV pruning has to happen
// or is indefinitely `pending` in case no pruning has to be done.
// Just a helper to `select` over multiple things at once.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn maybe_prune_povs(&self) -> Result<impl Future<Output = ()>, Error> {
let future = match get_next_pov_pruning_time(&self.inner) {
Some(pruning) => {
@@ -375,6 +378,7 @@ impl AvailabilityStoreSubsystem {
// Return a `Future` that either resolves when another chunk pruning has to happen
// or is indefinitely `pending` in case no pruning has to be done.
// Just a helper to `select` over multiple things at once.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn maybe_prune_chunks(&self) -> Result<impl Future<Output = ()>, Error> {
let future = match get_next_chunk_pruning_time(&self.inner) {
Some(pruning) => {
@@ -473,6 +477,7 @@ fn get_next_chunk_pruning_time(db: &Arc<dyn KeyValueDB>) -> Option<NextChunkPrun
query_inner(db, columns::META, &NEXT_CHUNK_PRUNING)
}
#[tracing::instrument(skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
async fn run<Context>(mut subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>,
@@ -481,10 +486,10 @@ where
let res = run_iteration(&mut subsystem, &mut ctx).await;
match res {
Err(e) => {
log::log!(target: LOG_TARGET, e.severity(), "{}", e);
e.trace();
}
Ok(true) => {
log::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting");
break;
},
Ok(false) => continue,
@@ -492,6 +497,7 @@ where
}
}
#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
async fn run_iteration<Context>(subsystem: &mut AvailabilityStoreSubsystem, ctx: &mut Context)
-> Result<bool, Error>
where
@@ -545,6 +551,7 @@ where
/// The state of data has to be changed from
/// `CandidateState::Included` to `CandidateState::Finalized` and their pruning times have
/// to be updated to `now` + keep_finalized_{block, chunk}_for`.
#[tracing::instrument(level = "trace", skip(subsystem, ctx, db), fields(subsystem = LOG_TARGET))]
async fn process_block_finalized<Context>(
subsystem: &AvailabilityStoreSubsystem,
ctx: &mut Context,
@@ -561,10 +568,10 @@ where
// numbers we have to iterate through the whole collection here.
for record in pov_pruning.iter_mut() {
if record.block_number <= block_number {
log::trace!(
tracing::trace!(
target: LOG_TARGET,
"Updating pruning record for finalized block {}",
record.block_number,
block_number = %record.block_number,
"Updating pruning record for finalized block",
);
record.prune_at = PruningDelay::into_the_future(
@@ -580,10 +587,10 @@ where
if let Some(mut chunk_pruning) = chunk_pruning(db) {
for record in chunk_pruning.iter_mut() {
if record.block_number <= block_number {
log::trace!(
tracing::trace!(
target: LOG_TARGET,
"Updating chunk pruning record for finalized block {}",
record.block_number,
block_number = %record.block_number,
"Updating chunk pruning record for finalized block",
);
record.prune_at = PruningDelay::into_the_future(
@@ -599,6 +606,7 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(ctx, db), fields(subsystem = LOG_TARGET))]
async fn process_block_activated<Context>(
ctx: &mut Context,
db: &Arc<dyn KeyValueDB>,
@@ -610,17 +618,21 @@ where
let events = match request_candidate_events(ctx, hash).await {
Ok(events) => events,
Err(err) => {
log::debug!(target: LOG_TARGET, "requesting candidate events failed due to {}", err);
tracing::debug!(target: LOG_TARGET, err = ?err, "requesting candidate events failed");
return Ok(());
}
};
log::trace!(target: LOG_TARGET, "block activated {}", hash);
tracing::trace!(target: LOG_TARGET, hash = %hash, "block activated");
let mut included = HashSet::new();
for event in events.into_iter() {
if let CandidateEvent::CandidateIncluded(receipt, _) = event {
log::trace!(target: LOG_TARGET, "Candidate {:?} was included", receipt.hash());
tracing::trace!(
target: LOG_TARGET,
hash = %receipt.hash(),
"Candidate {:?} was included", receipt.hash(),
);
included.insert(receipt.hash());
}
}
@@ -654,6 +666,7 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn request_candidate_events<Context>(
ctx: &mut Context,
hash: Hash,
@@ -673,6 +686,7 @@ where
Ok(rx.await??)
}
#[tracing::instrument(level = "trace", skip(subsystem, ctx), fields(subsystem = LOG_TARGET))]
async fn process_message<Context>(
subsystem: &mut AvailabilityStoreSubsystem,
ctx: &mut Context,
@@ -744,6 +758,7 @@ fn chunk_pruning(db: &Arc<dyn KeyValueDB>) -> Option<Vec<ChunkPruningRecord>> {
query_inner(db, columns::META, &CHUNK_PRUNING_KEY)
}
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))]
fn put_pov_pruning(
db: &Arc<dyn KeyValueDB>,
tx: Option<DBTransaction>,
@@ -784,6 +799,7 @@ fn put_pov_pruning(
Ok(())
}
#[tracing::instrument(level = "trace", skip(db, tx), fields(subsystem = LOG_TARGET))]
fn put_chunk_pruning(
db: &Arc<dyn KeyValueDB>,
tx: Option<DBTransaction>,
@@ -836,6 +852,7 @@ where
Ok(rx.await??.map(|number| number).unwrap_or_default())
}
#[tracing::instrument(level = "trace", skip(subsystem, available_data), fields(subsystem = LOG_TARGET))]
fn store_available_data(
subsystem: &mut AvailabilityStoreSubsystem,
candidate_hash: &CandidateHash,
@@ -902,6 +919,7 @@ fn store_available_data(
Ok(())
}
#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))]
fn store_chunk(
subsystem: &mut AvailabilityStoreSubsystem,
candidate_hash: &CandidateHash,
@@ -953,6 +971,7 @@ fn store_chunk(
Ok(())
}
#[tracing::instrument(level = "trace", skip(subsystem), fields(subsystem = LOG_TARGET))]
fn get_chunk(
subsystem: &mut AvailabilityStoreSubsystem,
candidate_hash: &CandidateHash,
@@ -996,7 +1015,7 @@ fn query_inner<D: Decode>(
}
Ok(None) => None,
Err(e) => {
log::warn!(target: LOG_TARGET, "Error reading from the availability store: {:?}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "Error reading from the availability store");
None
}
}
@@ -1018,6 +1037,7 @@ where
}
}
#[tracing::instrument(level = "trace", skip(metrics), fields(subsystem = LOG_TARGET))]
fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> Result<Vec<ErasureChunk>, Error> {
let chunks = erasure::obtain_chunks_v1(n_validators, data)?;
metrics.on_chunks_received(chunks.len());
+3 -3
View File
@@ -128,7 +128,7 @@ async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
msg: AvailabilityStoreMessage,
) {
log::trace!("Sending message:\n{:?}", &msg);
tracing::trace!(meg = ?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
@@ -143,7 +143,7 @@ async fn overseer_recv(
.await
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
log::trace!("Received message:\n{:?}", &msg);
tracing::trace!(msg = ?msg, "received message");
msg
}
@@ -152,7 +152,7 @@ async fn overseer_recv_with_timeout(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityStoreMessage>,
timeout: Duration,
) -> Option<AllMessages> {
log::trace!("Waiting for message...");
tracing::trace!("waiting for message...");
overseer
.recv()
.timeout(timeout)
+2 -1
View File
@@ -14,7 +14,8 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
statement-table = { package = "polkadot-statement-table", path = "../../../statement-table" }
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
thiserror = "1.0.22"
[dev-dependencies]
+18 -6
View File
@@ -244,6 +244,7 @@ fn primitive_statement_to_table(s: &SignedFullStatement) -> TableSignedStatement
}
}
#[tracing::instrument(level = "trace", skip(attested, table_context), fields(subsystem = LOG_TARGET))]
fn table_attested_to_backed(
attested: TableAttestedCandidate<
ParaId,
@@ -308,6 +309,7 @@ impl CandidateBackingJob {
/// Validate the candidate that is requested to be `Second`ed and distribute validation result.
///
/// Returns `Ok(true)` if we issued a `Seconded` statement about this candidate.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn validate_and_second(
&mut self,
candidate: &CandidateReceipt,
@@ -390,6 +392,7 @@ impl CandidateBackingJob {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn get_backed(&self) -> Vec<NewBackedCandidate> {
let proposed = self.table.proposed_candidates(&self.table_context);
let mut res = Vec::with_capacity(proposed.len());
@@ -407,6 +410,7 @@ impl CandidateBackingJob {
/// Check if there have happened any new misbehaviors and issue necessary messages.
///
/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
let mut reports = Vec::new();
@@ -440,6 +444,7 @@ impl CandidateBackingJob {
}
/// Import a statement into the statement table and return the summary of the import.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn import_statement(
&mut self,
statement: &SignedFullStatement,
@@ -474,6 +479,7 @@ impl CandidateBackingJob {
Ok(summary)
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn process_msg(&mut self, msg: CandidateBackingMessage) -> Result<(), Error> {
match msg {
CandidateBackingMessage::Second(_, candidate, pov) => {
@@ -521,6 +527,7 @@ impl CandidateBackingJob {
}
/// Kick off validation work and distribute the result as a signed statement.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn kick_off_validation_work(
&mut self,
summary: TableSummary,
@@ -585,6 +592,7 @@ impl CandidateBackingJob {
}
/// Import the statement and kick off validation work if it is a part of our assignment.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn maybe_validate_and_import(
&mut self,
statement: SignedFullStatement,
@@ -600,6 +608,7 @@ impl CandidateBackingJob {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn sign_statement(&self, statement: Statement) -> Option<SignedFullStatement> {
let signed = self.table_context
.validator
@@ -611,6 +620,7 @@ impl CandidateBackingJob {
Some(signed)
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
let idx = statement.validator_index() as usize;
@@ -703,6 +713,7 @@ impl CandidateBackingJob {
// This calls an inspection function before making the PoV available for any last checks
// that need to be done. If the inspection function returns an error, this function returns
// early without making the PoV available.
#[tracing::instrument(level = "trace", skip(self, pov, with_commitments), fields(subsystem = LOG_TARGET))]
async fn make_pov_available<T, E>(
&mut self,
pov: Arc<PoV>,
@@ -767,6 +778,7 @@ impl util::JobTrait for CandidateBackingJob {
const NAME: &'static str = "CandidateBackingJob";
#[tracing::instrument(skip(keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
fn run(
parent: Hash,
keystore: SyncCryptoStorePtr,
@@ -780,10 +792,10 @@ impl util::JobTrait for CandidateBackingJob {
match $x {
Ok(x) => x,
Err(e) => {
log::warn!(
tracing::warn!(
target: LOG_TARGET,
"Failed to fetch runtime API data for job: {:?}",
e,
err = ?e,
"Failed to fetch runtime API data for job",
);
// We can't do candidate validation work if we don't have the
@@ -820,10 +832,10 @@ impl util::JobTrait for CandidateBackingJob {
Ok(v) => v,
Err(util::Error::NotAValidator) => { return Ok(()) },
Err(e) => {
log::warn!(
tracing::warn!(
target: LOG_TARGET,
"Cannot participate in candidate backing: {:?}",
e
err = ?e,
"Cannot participate in candidate backing",
);
return Ok(())
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
@@ -140,6 +140,7 @@ pub enum Error {
/// If there is a candidate pending availability, query the Availability Store
/// for whether we have the availability chunk for our validator index.
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
async fn get_core_availability(
relay_parent: Hash,
core: CoreState,
@@ -164,7 +165,7 @@ async fn get_core_availability(
Ok(None) => return Ok(false),
Err(e) => {
// Don't take down the node on runtime API errors.
log::warn!(target: LOG_TARGET, "Encountered a runtime API error: {:?}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "Encountered a runtime API error");
return Ok(false);
}
};
@@ -201,6 +202,7 @@ async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<Fr
/// - for each core, concurrently determine chunk availability (see `get_core_availability`)
/// - return the bitfield if there were no errors at any point in this process
/// (otherwise, it's prone to false negatives)
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
async fn construct_availability_bitfield(
relay_parent: Hash,
validator_idx: ValidatorIndex,
@@ -267,6 +269,7 @@ impl JobTrait for BitfieldSigningJob {
const NAME: &'static str = "BitfieldSigningJob";
/// Run a job for the parent block indicated
#[tracing::instrument(skip(keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
keystore: Self::RunArgs,
@@ -293,7 +296,7 @@ impl JobTrait for BitfieldSigningJob {
{
Err(Error::Runtime(runtime_err)) => {
// Don't take down the node on runtime API errors.
log::warn!(target: LOG_TARGET, "Encountered a runtime API error: {:?}", runtime_err);
tracing::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(());
}
Err(err) => return Err(err),
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
thiserror = "1.0.22"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
@@ -37,7 +37,7 @@ use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId,
use std::{convert::TryFrom, pin::Pin};
use thiserror::Error;
const TARGET: &'static str = "candidate_selection";
const LOG_TARGET: &'static str = "candidate_selection";
struct CandidateSelectionJob {
sender: mpsc::Sender<FromJob>,
@@ -134,6 +134,7 @@ impl JobTrait for CandidateSelectionJob {
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
#[tracing::instrument(skip(_relay_parent, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
_relay_parent: Hash,
_run_args: Self::RunArgs,
@@ -196,6 +197,7 @@ impl CandidateSelectionJob {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_collation(
&mut self,
relay_parent: Hash,
@@ -212,10 +214,10 @@ impl CandidateSelectionJob {
).await {
Ok(response) => response,
Err(err) => {
log::warn!(
target: TARGET,
"failed to get collation from collator protocol subsystem: {:?}",
err
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to get collation from collator protocol subsystem",
);
return;
}
@@ -230,35 +232,36 @@ impl CandidateSelectionJob {
)
.await
{
Err(err) => log::warn!(target: TARGET, "failed to second a candidate: {:?}", err),
Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"),
Ok(()) => self.seconded_candidate = Some(collator_id),
}
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_invalid(&mut self, candidate_receipt: CandidateReceipt) {
let received_from = match &self.seconded_candidate {
Some(peer) => peer,
None => {
log::warn!(
target: TARGET,
tracing::warn!(
target: LOG_TARGET,
"received invalidity notice for a candidate we don't remember seconding"
);
return;
}
};
log::info!(
target: TARGET,
"received invalidity note for candidate {:?}",
candidate_receipt
tracing::info!(
target: LOG_TARGET,
candidate_receipt = ?candidate_receipt,
"received invalidity note for candidate",
);
let result =
if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await {
log::warn!(
target: TARGET,
"failed to forward invalidity note: {:?}",
err
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to forward invalidity note",
);
Err(())
} else {
@@ -271,6 +274,7 @@ impl CandidateSelectionJob {
// get a collation from the Collator Protocol subsystem
//
// note that this gets an owned clone of the sender; that's becuase unlike `forward_invalidity_note`, it's expected to take a while longer
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
async fn get_collation(
relay_parent: Hash,
para_id: ParaId,
@@ -305,7 +309,7 @@ async fn second_candidate(
.await
{
Err(err) => {
log::warn!(target: TARGET, "failed to send a seconding message");
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to send a seconding message");
metrics.on_second(Err(()));
Err(err.into())
}
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
sp-core = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["bit-vec", "derive"] }
@@ -85,6 +85,7 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
}
}
#[tracing::instrument(skip(ctx, spawn, metrics), fields(subsystem = LOG_TARGET))]
async fn run(
mut ctx: impl SubsystemContext<Message = CandidateValidationMessage>,
spawn: impl SpawnNamed + Clone + 'static,
@@ -139,7 +140,7 @@ async fn run(
Ok(x) => {
metrics.on_validation_event(&x);
if let Err(_e) = response_sender.send(x) {
log::warn!(
tracing::warn!(
target: LOG_TARGET,
"Requester of candidate validation dropped",
)
@@ -176,6 +177,7 @@ enum AssumptionCheckOutcome {
BadRequest,
}
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn check_assumption_validation_data(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,
descriptor: &CandidateDescriptor,
@@ -226,6 +228,7 @@ async fn check_assumption_validation_data(
})
}
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn find_assumed_validation_data(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,
descriptor: &CandidateDescriptor,
@@ -257,6 +260,7 @@ async fn find_assumed_validation_data(
Ok(AssumptionCheckOutcome::DoesNotMatch)
}
#[tracing::instrument(level = "trace", skip(ctx, pov, spawn), fields(subsystem = LOG_TARGET))]
async fn spawn_validate_from_chain_state(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,
isolation_strategy: IsolationStrategy,
@@ -316,6 +320,7 @@ async fn spawn_validate_from_chain_state(
validation_result
}
#[tracing::instrument(level = "trace", skip(ctx, validation_code, pov, spawn), fields(subsystem = LOG_TARGET))]
async fn spawn_validate_exhaustive(
ctx: &mut impl SubsystemContext<Message = CandidateValidationMessage>,
isolation_strategy: IsolationStrategy,
@@ -345,6 +350,7 @@ async fn spawn_validate_exhaustive(
/// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks
/// are passed, `Err` otherwise.
#[tracing::instrument(level = "trace", skip(pov), fields(subsystem = LOG_TARGET))]
fn perform_basic_checks(
candidate: &CandidateDescriptor,
max_pov_size: u32,
@@ -402,6 +408,7 @@ impl ValidationBackend for RealValidationBackend {
/// Validates the candidate from exhaustive parameters.
///
/// Sends the result of validation on the channel once complete.
#[tracing::instrument(level = "trace", skip(backend_arg, validation_code, pov, spawn), fields(subsystem = LOG_TARGET))]
fn validate_candidate_exhaustive<B: ValidationBackend, S: SpawnNamed + 'static>(
backend_arg: B::Arg,
persisted_validation_data: PersistedValidationData,
+2
View File
@@ -6,6 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
tracing = "0.1.21"
tracing-futures = "0.2.4"
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
+4
View File
@@ -44,6 +44,8 @@ use std::sync::Arc;
use futures::prelude::*;
const LOG_TARGET: &str = "ChainApiSubsystem";
/// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> {
client: Arc<Client>,
@@ -75,6 +77,7 @@ impl<Client, Context> Subsystem<Context> for ChainApiSubsystem<Client> where
}
}
#[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
async fn run<Client>(
mut ctx: impl SubsystemContext<Message = ChainApiMessage>,
subsystem: ChainApiSubsystem<Client>,
@@ -113,6 +116,7 @@ where
let _ = response_channel.send(Ok(result));
},
ChainApiMessage::Ancestors { hash, k, response_channel } => {
tracing::span!(tracing::Level::TRACE, "ChainApiMessage::Ancestors", subsystem=LOG_TARGET, hash=%hash, k=k);
let mut hash = hash;
let next_parent = core::iter::from_fn(|| {
+1 -1
View File
@@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
futures-timer = "3.0.2"
log = "0.4.11"
tracing = "0.1.21"
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-overseer = { path = "../../overseer" }
polkadot-primitives = { path = "../../../primitives" }
+1 -1
View File
@@ -193,7 +193,7 @@ where
let provisioner_data = match self.get_provisioner_data().await {
Ok(pd) => pd,
Err(err) => {
log::warn!("could not get provisioner inherent data; injecting default data: {}", err);
tracing::warn!(err = ?err, "could not get provisioner inherent data; injecting default data");
Default::default()
}
};
+2 -1
View File
@@ -7,7 +7,8 @@ edition = "2018"
[dependencies]
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
thiserror = "1.0.22"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
+11 -2
View File
@@ -152,6 +152,7 @@ impl JobTrait for ProvisioningJob {
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
#[tracing::instrument(skip(_run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
_run_args: Self::RunArgs,
@@ -205,7 +206,7 @@ impl ProvisioningJob {
)
.await
{
log::warn!(target: LOG_TARGET, "failed to assemble or send inherent data: {:?}", err);
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
self.metrics.on_inherent_data_request(Err(()));
} else {
self.metrics.on_inherent_data_request(Ok(()));
@@ -254,6 +255,7 @@ impl ProvisioningJob {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn note_provisionable_data(&mut self, provisionable_data: ProvisionableData) {
match provisionable_data {
ProvisionableData::Bitfield(_, signed_bitfield) => {
@@ -286,6 +288,7 @@ type CoreAvailability = BitVec<bitvec::order::Lsb0, u8>;
/// When we're choosing bitfields to include, the rule should be simple:
/// maximize availability. So basically, include all bitfields. And then
/// choose a coherent set of candidates along with that.
#[tracing::instrument(level = "trace", skip(return_sender, from_job), fields(subsystem = LOG_TARGET))]
async fn send_inherent_data(
relay_parent: Hash,
bitfields: &[SignedAvailabilityBitfield],
@@ -323,6 +326,7 @@ async fn send_inherent_data(
///
/// Note: This does not enforce any sorting precondition on the output; the ordering there will be unrelated
/// to the sorting of the input.
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn select_availability_bitfields(
cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
@@ -354,6 +358,7 @@ fn select_availability_bitfields(
}
/// Determine which cores are free, and then to the degree possible, pick a candidate appropriate to each free core.
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
async fn select_candidates(
availability_cores: &[CoreState],
bitfields: &[SignedAvailabilityBitfield],
@@ -420,6 +425,7 @@ async fn select_candidates(
/// Produces a block number 1 higher than that of the relay parent
/// in the event of an invalid `relay_parent`, returns `Ok(0)`
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
async fn get_block_number_under_construction(
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
@@ -445,6 +451,7 @@ async fn get_block_number_under_construction(
/// - construct a transverse slice along `core_idx`
/// - bitwise-or it with the availability slice
/// - count the 1 bits, compare to the total length; true on 2/3+
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn bitfields_indicate_availability(
core_idx: usize,
bitfields: &[SignedAvailabilityBitfield],
@@ -460,8 +467,10 @@ fn bitfields_indicate_availability(
// in principle, this function might return a `Result<bool, Error>` so that we can more clearly express this error condition
// however, in practice, that would just push off an error-handling routine which would look a whole lot like this one.
// simpler to just handle the error internally here.
log::warn!(
tracing::warn!(
target: LOG_TARGET,
validator_idx = %validator_idx,
availability_len = %availability_len,
"attempted to set a transverse bit at idx {} which is greater than bitfield size {}",
validator_idx,
availability_len,
@@ -6,6 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
tracing = "0.1.21"
tracing-futures = "0.2.4"
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
@@ -40,6 +40,8 @@ use sp_api::{ProvideRuntimeApi};
use futures::prelude::*;
const LOG_TARGET: &str = "RuntimeApi";
/// The `RuntimeApiSubsystem`. See module docs for more details.
pub struct RuntimeApiSubsystem<Client> {
client: Arc<Client>,
@@ -66,6 +68,7 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
}
}
#[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
async fn run<Client>(
mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
subsystem: RuntimeApiSubsystem<Client>,
@@ -90,6 +93,7 @@ async fn run<Client>(
}
}
#[tracing::instrument(level = "trace", skip(client, metrics), fields(subsystem = LOG_TARGET))]
fn make_runtime_api_request<Client>(
client: &Client,
metrics: &Metrics,
@@ -6,8 +6,9 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
parity-scale-codec = { version = "1.3.5", features = ["std"] }
tracing = "0.1.21"
tracing-futures = "0.2.4"
parity-scale-codec = { version = "1.3.5", features = ["std"] }
polkadot-primitives = { path = "../../../primitives" }
polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
@@ -27,3 +28,4 @@ futures-timer = "3.0.2"
env_logger = "0.8.2"
assert_matches = "1.4.0"
smallvec = "1.5.0"
log = "0.4.11"
@@ -30,7 +30,6 @@ use futures::{channel::oneshot, FutureExt, TryFutureExt};
use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use log::{trace, warn};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
@@ -53,7 +52,7 @@ use std::collections::{HashMap, HashSet};
use std::iter;
use thiserror::Error;
const TARGET: &'static str = "avad";
const LOG_TARGET: &'static str = "AvailabilityDistribution";
#[derive(Debug, Error)]
enum Error {
@@ -197,6 +196,7 @@ struct PerRelayParent {
impl ProtocolState {
/// Collects the relay_parents ancestors including the relay parents themselfes.
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
fn extend_with_ancestors<'a>(
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
@@ -218,6 +218,7 @@ impl ProtocolState {
/// Unionize all cached entries for the given relay parents and its ancestors.
/// Ignores all non existent relay parents, so this can be used directly with a peers view.
/// Returns a map from candidate hash -> receipt
#[tracing::instrument(level = "trace", skip(relay_parents), fields(subsystem = LOG_TARGET))]
fn cached_live_candidates_unioned<'a>(
&'a self,
relay_parents: impl IntoIterator<Item = &'a Hash> + 'a,
@@ -232,6 +233,7 @@ impl ProtocolState {
.collect()
}
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn add_relay_parent<Context>(
&mut self,
ctx: &mut Context,
@@ -287,6 +289,7 @@ impl ProtocolState {
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> {
// we might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) {
@@ -327,6 +330,7 @@ impl ProtocolState {
/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
@@ -370,6 +374,7 @@ where
}
/// Handle the changes necessary when our view changes.
#[tracing::instrument(level = "trace", skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change<Context>(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
@@ -507,6 +512,7 @@ where
.await
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))]
async fn send_tracked_gossip_messages_to_peers<Context>(
ctx: &mut Context,
per_candidate: &mut PerCandidate,
@@ -556,6 +562,7 @@ where
// Send the difference between two views which were not sent
// to that particular peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -633,6 +640,7 @@ async fn obtain_our_validator_index(
}
/// Handle an incoming message from a peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn process_incoming_peer_message<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -711,8 +719,8 @@ where
)
.await?
{
warn!(
target: TARGET,
tracing::warn!(
target: LOG_TARGET,
"Failed to store erasure chunk to availability store"
);
}
@@ -771,6 +779,7 @@ impl AvailabilityDistributionSubsystem {
}
/// Start processing work as passed on from the Overseer.
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run<Context>(self, mut ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
@@ -795,9 +804,10 @@ impl AvailabilityDistributionSubsystem {
)
.await
{
warn!(
target: TARGET,
"Failed to handle incoming network messages: {:?}", e
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to handle incoming network messages",
);
}
}
@@ -834,6 +844,7 @@ where
}
/// Obtain all live candidates based on an iterator of relay heads.
#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))]
async fn query_live_candidates_without_ancestors<Context>(
ctx: &mut Context,
relay_parents: impl IntoIterator<Item = Hash>,
@@ -859,6 +870,7 @@ where
/// Obtain all live candidates based on an iterator or relay heads including `k` ancestors.
///
/// Relay parent.
#[tracing::instrument(level = "trace", skip(ctx, relay_parents), fields(subsystem = LOG_TARGET))]
async fn query_live_candidates<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -921,6 +933,7 @@ where
}
/// Query all para IDs.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_para_ids<Context>(ctx: &mut Context, relay_parent: Hash) -> Result<Vec<ParaId>>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
@@ -952,15 +965,16 @@ where
}
/// Modify the reputation of a peer based on its behavior.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
trace!(
target: TARGET,
"Reputation change of {:?} for peer {:?}",
rep,
peer
tracing::trace!(
target: LOG_TARGET,
rep = ?rep,
peer_id = ?peer,
"Reputation change for peer",
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
@@ -970,6 +984,7 @@ where
}
/// Query the proof of validity for a particular candidate hash.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_data_availability<Context>(ctx: &mut Context, candidate_hash: CandidateHash) -> Result<bool>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
@@ -984,6 +999,7 @@ where
.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
}
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
@@ -1001,6 +1017,7 @@ where
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
}
#[tracing::instrument(level = "trace", skip(ctx, erasure_chunk), fields(subsystem = LOG_TARGET))]
async fn store_chunk<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
@@ -1028,6 +1045,7 @@ where
}
/// Request the head data for a particular para.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_pending_availability<Context>(
ctx: &mut Context,
relay_parent: Hash,
@@ -1050,6 +1068,7 @@ where
}
/// Query the validator set.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_validators<Context>(
ctx: &mut Context,
relay_parent: Hash,
@@ -1072,6 +1091,7 @@ where
}
/// Query the hash of the `K` ancestors
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_k_ancestors<Context>(
ctx: &mut Context,
relay_parent: Hash,
@@ -1096,6 +1116,7 @@ where
}
/// Query the session index of a relay parent
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_session_index_for_child<Context>(
ctx: &mut Context,
relay_parent: Hash,
@@ -1118,6 +1139,7 @@ where
}
/// Queries up to k ancestors with the constraints of equiv session
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_up_to_k_ancestors_in_same_session<Context>(
ctx: &mut Context,
relay_parent: Hash,
@@ -103,7 +103,7 @@ async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
msg: AvailabilityDistributionMessage,
) {
log::trace!("Sending message:\n{:?}", &msg);
tracing::trace!(msg = ?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
@@ -114,13 +114,13 @@ async fn overseer_send(
async fn overseer_recv(
overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
) -> AllMessages {
log::trace!("Waiting for message ...");
tracing::trace!("waiting for message ...");
let msg = overseer
.recv()
.timeout(TIMEOUT)
.await
.expect("TIMEOUT is enough to recv.");
log::trace!("Received message:\n{:?}", &msg);
tracing::trace!(msg = ?msg, "received message");
msg
}
@@ -439,11 +439,11 @@ fn reputation_verification() {
let peer_b = PeerId::random();
assert_ne!(&peer_a, &peer_b);
log::trace!("peer A: {:?}", peer_a);
log::trace!("peer B: {:?}", peer_b);
tracing::trace!("peer A: {:?}", peer_a);
tracing::trace!("peer B: {:?}", peer_b);
log::trace!("candidate A: {:?}", candidates[0].hash());
log::trace!("candidate B: {:?}", candidates[1].hash());
tracing::trace!("candidate A: {:?}", candidates[0].hash());
tracing::trace!("candidate B: {:?}", candidates[1].hash());
overseer_signal(
&mut virtual_overseer,
@@ -627,7 +627,7 @@ fn reputation_verification() {
let mut candidates2 = candidates.clone();
// check if the availability store can provide the desired erasure chunks
for i in 0usize..2 {
log::trace!("0000");
tracing::trace!("0000");
let avail_data = make_available_data(&test_state, pov_block_a.clone());
let chunks =
derive_erasure_chunks_with_proofs(test_state.validators.len(), &avail_data);
@@ -652,10 +652,10 @@ fn reputation_verification() {
assert_eq!(chunks.len(), test_state.validators.len());
log::trace!("xxxx");
tracing::trace!("xxxx");
// retrieve a stored chunk
for (j, chunk) in chunks.into_iter().enumerate() {
log::trace!("yyyy i={}, j={}", i, j);
tracing::trace!("yyyy i={}, j={}", i, j);
if i != 0 {
// not a validator, so this never happens
break;
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
@@ -21,6 +22,7 @@ sp-application-crypto = { git = "https://github.com/paritytech/substrate", branc
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
maplit = "1.0.2"
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
tempfile = "3.1.0"
@@ -25,7 +25,6 @@
use parity_scale_codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
use log::{debug, trace, warn};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
@@ -79,7 +78,7 @@ impl BitfieldGossipMessage {
/// Data used to track information of peers and relay parents the
/// overseer ordered us to work on.
#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
struct ProtocolState {
/// track all active peers and their views
/// to determine what is relevant to them.
@@ -144,6 +143,7 @@ impl BitfieldDistribution {
}
/// Start processing work as passed on from the Overseer.
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run<Context>(self, mut ctx: Context)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
@@ -154,7 +154,7 @@ impl BitfieldDistribution {
let message = match ctx.recv().await {
Ok(message) => message,
Err(e) => {
debug!(target: LOG_TARGET, "Failed to receive a message from Overseer: {}, exiting", e);
tracing::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
return;
},
};
@@ -162,7 +162,7 @@ impl BitfieldDistribution {
FromOverseer::Communication {
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
} => {
trace!(target: LOG_TARGET, "Processing DistributeBitfield");
tracing::trace!(target: LOG_TARGET, "Processing DistributeBitfield");
if let Err(err) = handle_bitfield_distribution(
&mut ctx,
&mut state,
@@ -170,21 +170,21 @@ impl BitfieldDistribution {
hash,
signed_availability,
).await {
warn!(target: LOG_TARGET, "Failed to reply to `DistributeBitfield` message: {}", err);
tracing::warn!(target: LOG_TARGET, err = ?err, "Failed to reply to `DistributeBitfield` message");
}
}
FromOverseer::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
trace!(target: LOG_TARGET, "Processing NetworkMessage");
tracing::trace!(target: LOG_TARGET, "Processing NetworkMessage");
// a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
warn!(target: LOG_TARGET, "Failed to handle incoming network messages: {:?}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to handle incoming network messages");
}
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
for relay_parent in activated {
trace!(target: LOG_TARGET, "Start {:?}", relay_parent);
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated");
// query basic system parameters once
match query_basics(&mut ctx, relay_parent).await {
Ok(Some((validator_set, signing_context))) => {
@@ -203,22 +203,22 @@ impl BitfieldDistribution {
);
}
Err(e) => {
warn!(target: LOG_TARGET, "query_basics has failed: {}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "query_basics has failed");
}
_ => {},
}
}
for relay_parent in deactivated {
trace!(target: LOG_TARGET, "Stop {:?}", relay_parent);
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "deactivated");
// defer the cleanup to the view change
}
}
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => {
trace!(target: LOG_TARGET, "Block finalized {:?}", hash);
tracing::trace!(target: LOG_TARGET, hash = %hash, "block finalized");
}
FromOverseer::Signal(OverseerSignal::Conclude) => {
trace!(target: LOG_TARGET, "Conclude");
tracing::trace!(target: LOG_TARGET, "Conclude");
return;
}
}
@@ -227,6 +227,7 @@ impl BitfieldDistribution {
}
/// Modify the reputation of a peer based on its behaviour.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(
ctx: &mut Context,
peer: PeerId,
@@ -235,7 +236,7 @@ async fn modify_reputation<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
trace!(target: LOG_TARGET, "Reputation change of {:?} for peer {:?}", rep, peer);
tracing::trace!(target: LOG_TARGET, rep = ?rep, peer_id = %peer, "reputation change");
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
))
@@ -245,6 +246,7 @@ where
/// Distribute a given valid and signature checked bitfield message.
///
/// For this variant the source is this node.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_bitfield_distribution<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -260,17 +262,17 @@ where
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
trace!(
tracing::trace!(
target: LOG_TARGET,
"Not supposed to work on relay parent {} related data",
relay_parent
relay_parent = %relay_parent,
"Not supposed to work on relay parent related data",
);
return Ok(());
};
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
trace!(target: LOG_TARGET, "Validator set for {:?} is empty", relay_parent);
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "validator set is empty");
return Ok(());
}
@@ -278,7 +280,7 @@ where
let validator = if let Some(validator) = validator_set.get(validator_index) {
validator.clone()
} else {
trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index);
tracing::trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index);
return Ok(());
};
@@ -298,6 +300,7 @@ where
/// Distribute a given valid and signature checked bitfield message.
///
/// Can be originated by another subsystem or received via network from another peer.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn relay_message<Context>(
ctx: &mut Context,
job_data: &mut PerRelayParentData,
@@ -342,10 +345,10 @@ where
.collect::<Vec<PeerId>>();
if interested_peers.is_empty() {
trace!(
tracing::trace!(
target: LOG_TARGET,
"No peers are interested in gossip for relay parent {:?}",
message.relay_parent
relay_parent = %message.relay_parent,
"no peers are interested in gossip for relay parent",
);
} else {
ctx.send_message(AllMessages::NetworkBridge(
@@ -360,6 +363,7 @@ where
}
/// Handle an incoming message from a peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn process_incoming_peer_message<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -385,10 +389,10 @@ where
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
trace!(
tracing::trace!(
target: LOG_TARGET,
"Validator set for relay parent {:?} is empty",
&message.relay_parent
relay_parent = %message.relay_parent,
"Validator set is empty",
);
return modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
}
@@ -427,10 +431,10 @@ where
// only relay_message a message of a validator once
if one_per_validator.get(&validator).is_some() {
trace!(
tracing::trace!(
target: LOG_TARGET,
"Already received a message for validator at index {}",
validator_index
validator_index,
"already received a message for validator",
);
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
return Ok(());
@@ -447,6 +451,7 @@ where
/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -474,7 +479,7 @@ where
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
trace!(target: LOG_TARGET, "Received bitfield gossip from peer {:?}", &remote);
tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer");
let gossiped_bitfield = BitfieldGossipMessage {
relay_parent,
signed_availability: bitfield,
@@ -488,13 +493,15 @@ where
}
/// Handle the changes necassary when our view changes.
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemResult<()> {
let old_view = std::mem::replace(&mut (state.view), view);
for added in state.view.difference(&old_view) {
if !state.per_relay_parent.contains_key(&added) {
warn!(
tracing::warn!(
target: LOG_TARGET,
added = %added,
"Our view contains {} but the overseer never told use we should work on this",
&added
);
@@ -510,6 +517,7 @@ fn handle_our_view_change(state: &mut ProtocolState, view: View) -> SubsystemRes
// Send the difference between two views which were not sent
// to that particular peer.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -560,6 +568,7 @@ where
}
/// Send a gossip message and track it in the per relay parent data.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn send_tracked_gossip_message<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
@@ -610,6 +619,7 @@ where
}
/// Query our validator set and signing context for a particular relay parent.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_basics<Context>(
ctx: &mut Context,
relay_parent: Hash,
@@ -639,7 +649,7 @@ where
SigningContext { parent_hash: relay_parent, session_index: s },
))),
(Err(e), _) | (_, Err(e)) => {
warn!(target: LOG_TARGET, "Failed to fetch basics from runtime API: {:?}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to fetch basics from runtime API");
Ok(None)
}
}
+2 -1
View File
@@ -7,7 +7,8 @@ edition = "2018"
[dependencies]
async-trait = "0.1.41"
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
polkadot-primitives = { path = "../../../primitives" }
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
+24 -12
View File
@@ -68,7 +68,7 @@ const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");
// network bridge log target
const TARGET: &'static str = "network_bridge";
const LOG_TARGET: &'static str = "network_bridge";
/// Messages received on the network.
#[derive(Debug, Encode, Decode, Clone)]
@@ -264,6 +264,7 @@ enum Action {
Nop,
}
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn action_from_overseer_message(
res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
) -> Action {
@@ -286,16 +287,17 @@ fn action_from_overseer_message(
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_)))
=> Action::Nop,
Err(e) => {
log::warn!(target: TARGET, "Shutting down Network Bridge due to error {:?}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error");
Action::Abort
}
}
}
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn action_from_network_message(event: Option<NetworkEvent>) -> Action {
match event {
None => {
log::info!(target: TARGET, "Shutting down Network Bridge: underlying event stream concluded");
tracing::info!(target: LOG_TARGET, "Shutting down Network Bridge: underlying event stream concluded");
Action::Abort
}
Some(NetworkEvent::Dht(_)) => Action::Nop,
@@ -350,6 +352,7 @@ fn construct_view(live_heads: &[Hash]) -> View {
View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect())
}
#[tracing::instrument(level = "trace", skip(net, ctx, validation_peers, collation_peers), fields(subsystem = LOG_TARGET))]
async fn update_view(
net: &mut impl Network,
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
@@ -379,7 +382,7 @@ async fn update_view(
NetworkBridgeEvent::OurViewChange(new_view.clone()),
ctx,
).await {
log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer");
tracing::warn!(target: LOG_TARGET, err = ?e, "Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
@@ -387,7 +390,7 @@ async fn update_view(
NetworkBridgeEvent::OurViewChange(new_view.clone()),
ctx,
).await {
log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer");
tracing::warn!(target: LOG_TARGET, err = ?e, "Aborting - Failure to dispatch messages to overseer");
return Err(e)
}
@@ -396,6 +399,7 @@ async fn update_view(
// Handle messages on a specific peer-set. The peer is expected to be connected on that
// peer-set.
#[tracing::instrument(level = "trace", skip(peers, messages, net), fields(subsystem = LOG_TARGET))]
async fn handle_peer_messages<M>(
peer: PeerId,
peers: &mut HashMap<PeerId, PeerData>,
@@ -442,6 +446,7 @@ async fn handle_peer_messages<M>(
Ok(outgoing_messages)
}
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
async fn send_validation_message<I>(
net: &mut impl Network,
peers: I,
@@ -454,6 +459,7 @@ async fn send_validation_message<I>(
send_message(net, peers, PeerSet::Validation, message).await
}
#[tracing::instrument(level = "trace", skip(net, peers), fields(subsystem = LOG_TARGET))]
async fn send_collation_message<I>(
net: &mut impl Network,
peers: I,
@@ -516,6 +522,7 @@ async fn dispatch_collation_event_to_all(
dispatch_collation_events_to_all(std::iter::once(event), ctx).await
}
#[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))]
async fn dispatch_validation_events_to_all<I>(
events: I,
ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
@@ -547,6 +554,7 @@ async fn dispatch_validation_events_to_all<I>(
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}
#[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))]
async fn dispatch_collation_events_to_all<I>(
events: I,
ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
@@ -564,6 +572,7 @@ async fn dispatch_collation_events_to_all<I>(
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}
#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>(
mut network_service: N,
mut authority_discovery_service: AD,
@@ -686,7 +695,7 @@ where
};
if let Err(e) = res {
log::warn!("Aborting - Failure to dispatch messages to overseer");
tracing::warn!(err = ?e, "Aborting - Failure to dispatch messages to overseer");
return Err(e);
}
}
@@ -713,8 +722,9 @@ where
};
if let Err(e) = res {
log::warn!(
target: TARGET,
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Aborting - Failure to dispatch messages to overseer",
);
return Err(e)
@@ -734,8 +744,9 @@ where
events,
&mut ctx,
).await {
log::warn!(
target: TARGET,
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Aborting - Failure to dispatch messages to overseer",
);
return Err(e)
@@ -754,8 +765,9 @@ where
events,
&mut ctx,
).await {
log::warn!(
target: TARGET,
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Aborting - Failure to dispatch messages to overseer",
);
return Err(e)
@@ -29,6 +29,7 @@ use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
const PRIORITY_GROUP: &'static str = "parachain_validators";
const LOG_TARGET: &str = "ValidatorDiscovery";
/// An abstraction over networking for the purposes of validator discovery service.
#[async_trait]
@@ -163,6 +164,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
/// Find connected validators using the given `validator_ids`.
///
/// Returns a [`HashMap`] that contains the found [`AuthorityDiscoveryId`]'s and their associated [`PeerId`]'s.
#[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))]
async fn find_connected_validators(
&mut self,
validator_ids: &[AuthorityDiscoveryId],
@@ -201,6 +203,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
/// This method will also clean up all previously revoked requests.
/// it takes `network_service` and `authority_discovery_service` by value
/// and returns them as a workaround for the Future: Send requirement imposed by async fn impl.
#[tracing::instrument(level = "trace", skip(self, connected, revoke, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))]
pub async fn on_request(
&mut self,
validator_ids: Vec<AuthorityDiscoveryId>,
@@ -283,7 +286,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
PRIORITY_GROUP.to_owned(),
multiaddr_to_add,
).await {
log::warn!(target: super::TARGET, "AuthorityDiscoveryService returned an invalid multiaddress: {}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service.remove_from_priority_group(PRIORITY_GROUP.to_owned(), multiaddr_to_remove).await;
@@ -304,6 +307,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
/// Should be called when a peer connected.
#[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))]
pub async fn on_peer_connected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) {
// check if it's an authority we've been waiting for
let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await;
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
thiserror = "1.0.22"
@@ -16,6 +17,7 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
[dev-dependencies]
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
smallvec = "1.5.0"
@@ -19,7 +19,6 @@ use std::collections::HashMap;
use super::{LOG_TARGET, Result};
use futures::{StreamExt, task::Poll};
use log::warn;
use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt,
@@ -138,6 +137,7 @@ struct State {
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn distribute_collation<Context>(
ctx: &mut Context,
state: &mut State,
@@ -152,10 +152,10 @@ where
// This collation is not in the active-leaves set.
if !state.view.contains(&relay_parent) {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Distribute collation message parent {:?} is outside of our view",
relay_parent,
relay_parent = %relay_parent,
"distribute collation message parent is outside of our view",
);
return Ok(());
@@ -171,9 +171,11 @@ where
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Looks like no core is assigned to {:?} at {:?}", id, relay_parent,
para_id = %id,
relay_parent = %relay_parent,
"looks like no core is assigned to {} at {}", id, relay_parent,
);
return Ok(());
}
@@ -183,9 +185,10 @@ where
let our_validators = match determine_our_validators(ctx, our_core, num_cores, relay_parent).await? {
Some(validators) => validators,
None => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"There are no validators assigned to {:?} core", our_core,
core = ?our_core,
"there are no validators assigned to core",
);
return Ok(());
@@ -217,6 +220,7 @@ where
/// Get the Id of the Core that is assigned to the para being collated on if any
/// and the total number of cores.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn determine_core<Context>(
ctx: &mut Context,
para_id: ParaId,
@@ -241,6 +245,7 @@ where
/// Figure out a group of validators assigned to the para being collated on.
///
/// This returns validators for the current group and the next group.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn determine_our_validators<Context>(
ctx: &mut Context,
core_index: CoreIndex,
@@ -280,6 +285,7 @@ where
}
/// Issue a `Declare` collation message to a set of peers.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn declare<Context>(
ctx: &mut Context,
state: &mut State,
@@ -302,6 +308,7 @@ where
/// Issue a connection request to a set of validators and
/// revoke the previous connection request.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn connect_to_validators<Context>(
ctx: &mut Context,
relay_parent: Hash,
@@ -327,6 +334,7 @@ where
}
/// Advertise collation to a set of relay chain validators.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn advertise_collation<Context>(
ctx: &mut Context,
state: &mut State,
@@ -358,6 +366,7 @@ where
}
/// The main incoming message dispatching switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn process_msg<Context>(
ctx: &mut Context,
state: &mut State,
@@ -377,39 +386,39 @@ where
Some(id) if receipt.descriptor.para_id != id => {
// If the ParaId of a collation requested to be distributed does not match
// the one we expect, we ignore the message.
warn!(
tracing::warn!(
target: LOG_TARGET,
"DistributeCollation message for para {:?} while collating on {:?}",
receipt.descriptor.para_id,
id,
para_id = %receipt.descriptor.para_id,
collating_on = %id,
"DistributeCollation for unexpected para_id",
);
}
Some(id) => {
distribute_collation(ctx, state, id, receipt, pov).await?;
}
None => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"DistributeCollation message for para {:?} while not collating on any",
receipt.descriptor.para_id,
para_id = %receipt.descriptor.para_id,
"DistributeCollation message while not collating on any",
);
}
}
}
FetchCollation(_, _, _, _) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"FetchCollation message is not expected on the collator side of the protocol",
);
}
ReportCollator(_) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"ReportCollator message is not expected on the collator side of the protocol",
);
}
NoteGoodCollation(_) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"NoteGoodCollation message is not expected on the collator side of the protocol",
);
@@ -420,9 +429,10 @@ where
state,
event,
).await {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Failed to handle incoming network message: {:?}", e,
err = ?e,
"Failed to handle incoming network message",
);
}
},
@@ -432,6 +442,7 @@ where
}
/// Issue a response to a previously requested collation.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn send_collation<Context>(
ctx: &mut Context,
state: &mut State,
@@ -462,6 +473,7 @@ where
}
/// A networking messages switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_peer_message<Context>(
ctx: &mut Context,
state: &mut State,
@@ -475,13 +487,13 @@ where
match msg {
Declare(_) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Declare message is not expected on the collator side of the protocol",
);
}
AdvertiseCollation(_, _) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
@@ -494,24 +506,25 @@ where
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?;
}
} else {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Received a RequestCollation for {:?} while collating on {:?}",
para_id, our_para_id,
for_para_id = %para_id,
our_para_id = %our_para_id,
"received a RequestCollation for unexpected para_id",
);
}
}
None => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Received a RequestCollation for {:?} while not collating on any para",
para_id,
for_para_id = %para_id,
"received a RequestCollation while not collating on any para",
);
}
}
}
Collation(_, _, _) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Collation message is not expected on the collator side of the protocol",
);
@@ -522,6 +535,7 @@ where
}
/// Our view has changed.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
state: &mut State,
@@ -549,6 +563,7 @@ where
/// A validator is connected.
///
/// `Declare` that we are a collator with a given `CollatorId`.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_validator_connected<Context>(
ctx: &mut Context,
state: &mut State,
@@ -571,6 +586,7 @@ where
}
/// Bridge messages switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
ctx: &mut Context,
state: &mut State,
@@ -605,6 +621,7 @@ where
}
/// Handles our view changes.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change(
state: &mut State,
view: View,
@@ -624,6 +641,7 @@ async fn handle_our_view_change(
}
/// The collator protocol collator side main loop.
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run<Context>(
mut ctx: Context,
our_id: CollatorId,
@@ -646,10 +664,10 @@ where
if let Some(mut request) = state.last_connection_request.take() {
while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) {
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Failed to declare our collator id: {:?}",
err,
err = ?err,
"Failed to declare our collator id",
);
}
}
@@ -661,7 +679,7 @@ where
match msg? {
Communication { msg } => {
if let Err(e) = process_msg(&mut ctx, &mut state, msg).await {
warn!(target: LOG_TARGET, "Failed to process message: {}", e);
tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to process message");
}
},
Signal(ActiveLeaves(_update)) => {}
@@ -682,7 +700,6 @@ mod tests {
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use log::trace;
use smallvec::smallvec;
use sp_core::crypto::Pair;
@@ -839,7 +856,7 @@ mod tests {
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
msg: CollatorProtocolMessage,
) {
trace!("Sending message:\n{:?}", &msg);
tracing::trace!(msg = ?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
@@ -854,7 +871,7 @@ mod tests {
.await
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
trace!("Received message:\n{:?}", &msg);
tracing::trace!(msg = ?msg, "received message");
msg
}
@@ -863,7 +880,7 @@ mod tests {
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
timeout: Duration,
) -> Option<AllMessages> {
trace!("Waiting for message...");
tracing::trace!("waiting for message...");
overseer
.recv()
.timeout(timeout)
@@ -21,7 +21,6 @@
use std::time::Duration;
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use log::trace;
use thiserror::Error;
use polkadot_subsystem::{
@@ -96,6 +95,7 @@ impl CollatorProtocolSubsystem {
}
}
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run<Context>(self, ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
@@ -135,13 +135,16 @@ where
}
/// Modify the reputation of a peer based on its behavior.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
trace!(
tracing::trace!(
target: LOG_TARGET,
"Reputation change of {:?} for peer {:?}", rep, peer,
rep = ?rep,
peer_id = %peer,
"reputation change for peer",
);
ctx.send_message(AllMessages::NetworkBridge(
@@ -25,7 +25,6 @@ use futures::{
future::BoxFuture,
stream::FuturesUnordered,
};
use log::{trace, warn};
use polkadot_primitives::v1::{
Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV,
@@ -188,6 +187,7 @@ struct State {
}
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
#[tracing::instrument(level = "trace", skip(ctx, state, tx), fields(subsystem = LOG_TARGET))]
async fn fetch_collation<Context>(
ctx: &mut Context,
state: &mut State,
@@ -206,9 +206,10 @@ where
if let Err(e) = tx.send((collation.1.clone(), collation.2.clone())) {
// We do not want this to be fatal because the receving subsystem
// may have closed the results channel for some reason.
trace!(
tracing::trace!(
target: LOG_TARGET,
"Failed to send collation: {:?}", e,
err = ?e,
"Failed to send collation",
);
}
return Ok(());
@@ -238,6 +239,7 @@ where
}
/// Report a collator for some malicious actions.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn report_collator<Context>(
ctx: &mut Context,
state: &mut State,
@@ -259,6 +261,7 @@ where
}
/// Some other subsystem has reported a collator as a good one, bump reputation.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn note_good_collation<Context>(
ctx: &mut Context,
state: &mut State,
@@ -279,6 +282,7 @@ where
/// A peer's view has changed. A number of things should be done:
/// - Ongoing collation requests have to be cancelled.
/// - Advertisements by this peer that are no longer relevant have to be removed.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change(
state: &mut State,
peer_id: PeerId,
@@ -320,6 +324,7 @@ async fn handle_peer_view_change(
/// - Cancel all ongoing requests
/// - Reply to interested parties if any
/// - Store collation.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn received_collation<Context>(
ctx: &mut Context,
state: &mut State,
@@ -368,6 +373,7 @@ where
/// - Check if the requested collation is in our view.
/// - Update PerRequest records with the `result` field if necessary.
/// And as such invocations of this function may rely on that.
#[tracing::instrument(level = "trace", skip(ctx, state, result), fields(subsystem = LOG_TARGET))]
async fn request_collation<Context>(
ctx: &mut Context,
state: &mut State,
@@ -380,19 +386,23 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
if !state.view.contains(&relay_parent) {
trace!(
tracing::trace!(
target: LOG_TARGET,
"Collation by {} on {} on relay parent {} is no longer in view",
peer_id, para_id, relay_parent,
peer_id = %peer_id,
para_id = %para_id,
relay_parent = %relay_parent,
"collation is no longer in view",
);
return Ok(());
}
if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) {
trace!(
tracing::trace!(
target: LOG_TARGET,
"Collation by {} on {} on relay parent {} has already been requested",
peer_id, para_id, relay_parent,
peer_id = %peer_id,
para_id = %para_id,
relay_parent = %relay_parent,
"collation has already been requested",
);
return Ok(());
}
@@ -436,6 +446,7 @@ where
}
/// Notify `CandidateSelectionSubsystem` that a collation has been advertised.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn notify_candidate_selection<Context>(
ctx: &mut Context,
collator: CollatorId,
@@ -457,6 +468,7 @@ where
}
/// Networking message has been received.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn process_incoming_peer_message<Context>(
ctx: &mut Context,
state: &mut State,
@@ -495,6 +507,7 @@ where
/// A leaf has become inactive so we want to
/// - Cancel all ongoing collation requests that are on top of that leaf.
/// - Remove all stored collations relevant to that leaf.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn remove_relay_parent(
state: &mut State,
relay_parent: Hash,
@@ -520,6 +533,7 @@ async fn remove_relay_parent(
}
/// Our view has changed.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change(
state: &mut State,
view: View,
@@ -543,6 +557,7 @@ async fn handle_our_view_change(
}
/// A request has timed out.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn request_timed_out<Context>(
ctx: &mut Context,
state: &mut State,
@@ -568,6 +583,7 @@ where
}
/// Bridge event switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
ctx: &mut Context,
state: &mut State,
@@ -601,6 +617,7 @@ where
}
/// The main message receiver switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn process_msg<Context>(
ctx: &mut Context,
msg: CollatorProtocolMessage,
@@ -613,13 +630,14 @@ where
match msg {
CollateOn(id) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"CollateOn({}) message is not expected on the validator side of the protocol", id,
para_id = %id,
"CollateOn message is not expected on the validator side of the protocol",
);
}
DistributeCollation(_, _) => {
warn!(
tracing::warn!(
target: LOG_TARGET,
"DistributeCollation message is not expected on the validator side of the protocol",
);
@@ -639,9 +657,10 @@ where
state,
event,
).await {
warn!(
tracing::warn!(
target: LOG_TARGET,
"Failed to handle incoming network message: {:?}", e,
err = ?e,
"Failed to handle incoming network message",
);
}
}
@@ -651,6 +670,7 @@ where
}
/// The main run loop.
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run<Context>(
mut ctx: Context,
request_timeout: Duration,
@@ -671,7 +691,7 @@ where
loop {
if let Poll::Ready(msg) = futures::poll!(ctx.recv()) {
let msg = msg?;
trace!(target: LOG_TARGET, "Received a message {:?}", msg);
tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
match msg {
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await?,
@@ -687,7 +707,7 @@ where
// if the chain has not moved on yet.
match request {
CollationRequestResult::Timeout(id) => {
trace!(target: LOG_TARGET, "Request timed out {}", id);
tracing::trace!(target: LOG_TARGET, id, "request timed out");
request_timed_out(&mut ctx, &mut state, id).await?;
}
CollationRequestResult::Received(id) => {
@@ -784,7 +804,7 @@ mod tests {
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
msg: CollatorProtocolMessage,
) {
log::trace!("Sending message:\n{:?}", &msg);
tracing::trace!("Sending message:\n{:?}", &msg);
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
@@ -799,7 +819,7 @@ mod tests {
.await
.expect(&format!("{:?} is enough to receive messages.", TIMEOUT));
log::trace!("Received message:\n{:?}", &msg);
tracing::trace!("Received message:\n{:?}", &msg);
msg
}
@@ -808,7 +828,7 @@ mod tests {
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
timeout: Duration,
) -> Option<AllMessages> {
log::trace!("Waiting for message...");
tracing::trace!("Waiting for message...");
overseer
.recv()
.timeout(timeout)
@@ -826,7 +846,7 @@ mod tests {
} = test_harness;
let pair = CollatorPair::generate().0;
log::trace!("activating");
tracing::trace!("activating");
overseer_send(
&mut virtual_overseer,
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
@@ -116,6 +116,7 @@ fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
/// Handles the signal. If successful, returns `true` if the subsystem should conclude,
/// `false` otherwise.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_signal(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -134,10 +135,10 @@ async fn handle_signal(
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
Err(e) => {
log::warn!(
tracing::warn!(
target: LOG_TARGET,
"Error fetching validators from runtime API for active leaf: {:?}",
e
err = ?e,
"Error fetching validators from runtime API for active leaf",
);
// Not adding bookkeeping here might make us behave funny, but we
@@ -169,6 +170,7 @@ async fn handle_signal(
/// Notify peers that we are awaiting a given PoV hash.
///
/// This only notifies peers who have the relay parent in their view.
#[tracing::instrument(level = "trace", skip(peers, ctx), fields(subsystem = LOG_TARGET))]
async fn notify_all_we_are_awaiting(
peers: &mut HashMap<PeerId, PeerState>,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -195,6 +197,7 @@ async fn notify_all_we_are_awaiting(
}
/// Notify one peer about everything we're awaiting at a given relay-parent.
#[tracing::instrument(level = "trace", skip(ctx, relay_parent_state), fields(subsystem = LOG_TARGET))]
async fn notify_one_we_are_awaiting_many(
peer: &PeerId,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -219,6 +222,7 @@ async fn notify_one_we_are_awaiting_many(
}
/// Distribute a PoV to peers who are awaiting it.
#[tracing::instrument(level = "trace", skip(peers, ctx, metrics, pov), fields(subsystem = LOG_TARGET))]
async fn distribute_to_awaiting(
peers: &mut HashMap<PeerId, PeerState>,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -255,6 +259,7 @@ async fn distribute_to_awaiting(
}
/// Handles a `FetchPoV` message.
#[tracing::instrument(level = "trace", skip(ctx, state, response_sender), fields(subsystem = LOG_TARGET))]
async fn handle_fetch(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -286,8 +291,10 @@ async fn handle_fetch(
}
if relay_parent_state.fetching.len() > 2 * relay_parent_state.n_validators {
log::warn!("Other subsystems have requested PoV distribution to \
fetch more PoVs than reasonably expected: {}", relay_parent_state.fetching.len());
tracing::warn!(
relay_parent_state.fetching.len = relay_parent_state.fetching.len(),
"other subsystems have requested PoV distribution to fetch more PoVs than reasonably expected",
);
return Ok(());
}
@@ -301,6 +308,7 @@ async fn handle_fetch(
}
/// Handles a `DistributePoV` message.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn handle_distribute(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -336,6 +344,7 @@ async fn handle_distribute(
}
/// Report a reputation change for a peer.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn report_peer(
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
peer: PeerId,
@@ -345,6 +354,7 @@ async fn report_peer(
}
/// Handle a notification from a peer that they are awaiting some PoVs.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_awaiting(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -359,7 +369,7 @@ async fn handle_awaiting(
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
log::warn!("PoV Distribution relay parent state out-of-sync with our view");
tracing::warn!("PoV Distribution relay parent state out-of-sync with our view");
return Ok(());
}
Some(s) => s,
@@ -399,6 +409,7 @@ async fn handle_awaiting(
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
///
/// Completes any requests awaiting that PoV.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_pov(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -466,6 +477,7 @@ async fn handle_incoming_pov(
}
/// Handles a network bridge update.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -537,6 +549,7 @@ impl PoVDistribution {
Self { metrics }
}
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run(
self,
mut ctx: impl SubsystemContext<Message = PoVDistributionMessage>,
@@ -7,7 +7,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
polkadot-primitives = { path = "../../../primitives" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
@@ -111,7 +111,7 @@ impl VcPerPeerTracker {
/// based on a message that we have sent it from our local pool.
fn note_local(&mut self, h: CandidateHash) {
if !note_hash(&mut self.local_observed, h) {
log::warn!("Statement distribution is erroneously attempting to distribute more \
tracing::warn!("Statement distribution is erroneously attempting to distribute more \
than {} candidate(s) per validator index. Ignoring", VC_THRESHOLD);
}
}
@@ -164,6 +164,7 @@ impl PeerRelayParentKnowledge {
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(&mut self, fingerprint: &(CompactStatement, ValidatorIndex)) -> Option<bool> {
let already_known = self.sent_statements.contains(fingerprint)
|| self.received_statements.contains(fingerprint);
@@ -212,6 +213,7 @@ impl PeerRelayParentKnowledge {
///
/// This returns `Ok(true)` if this is the first time the peer has become aware of a
/// candidate with given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn receive(
&mut self,
fingerprint: &(CompactStatement, ValidatorIndex),
@@ -278,6 +280,7 @@ impl PeerData {
///
/// This returns `Some(true)` if this is the first time the peer has become aware of a
/// candidate with the given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn send(
&mut self,
relay_parent: &Hash,
@@ -302,6 +305,7 @@ impl PeerData {
///
/// This returns `Ok(true)` if this is the first time the peer has become aware of a
/// candidate with given hash.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn receive(
&mut self,
relay_parent: &Hash,
@@ -411,6 +415,7 @@ impl ActiveHeadData {
///
/// Any other statements or those that reference a candidate we are not aware of cannot be accepted
/// and will return `NotedStatement::NotUseful`.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn note_statement(&mut self, statement: SignedFullStatement) -> NotedStatement {
let validator_index = statement.validator_index();
let comparator = StoredStatementComparator {
@@ -490,6 +495,7 @@ fn check_statement_signature(
/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))]
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut Vec<mpsc::Sender<SignedFullStatement>>,
@@ -509,6 +515,7 @@ async fn inform_statement_listeners(
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
/// them but now can.
#[tracing::instrument(level = "trace", skip(peers, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
async fn circulate_statement_and_dependents(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
@@ -564,6 +571,7 @@ fn statement_message(relay_parent: Hash, statement: SignedFullStatement)
/// Circulates a statement to all peers who have not seen it yet, and returns
/// an iterator over peers who need to have dependent statements sent.
#[tracing::instrument(level = "trace", skip(peers, ctx), fields(subsystem = LOG_TARGET))]
async fn circulate_statement(
peers: &mut HashMap<PeerId, PeerData>,
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
@@ -597,6 +605,7 @@ async fn circulate_statement(
}
/// Send all statements about a given candidate hash to a peer.
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_head, metrics), fields(subsystem = LOG_TARGET))]
async fn send_statements_about(
peer: PeerId,
peer_data: &mut PeerData,
@@ -625,6 +634,7 @@ async fn send_statements_about(
}
/// Send all statements at a given relay-parent to a peer.
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_head, metrics), fields(subsystem = LOG_TARGET))]
async fn send_statements(
peer: PeerId,
peer_data: &mut PeerData,
@@ -666,6 +676,7 @@ async fn report_peer(
//
// This function checks the signature and ensures the statement is compatible with our
// view.
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_message<'a>(
peer: PeerId,
peer_data: &mut PeerData,
@@ -688,7 +699,10 @@ async fn handle_incoming_message<'a>(
None => {
// This should never be out-of-sync with our view if the view updates
// correspond to actual `StartWork` messages. So we just log and ignore.
log::warn!("Our view out-of-sync with active heads. Head {} not found", relay_parent);
tracing::warn!(
requested_relay_parent = %relay_parent,
"our view out-of-sync with active heads; head not found",
);
return Ok(None);
}
};
@@ -741,6 +755,7 @@ async fn handle_incoming_message<'a>(
}
/// Update a peer's view. Sends all newly unlocked statements based on the previous
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
async fn update_peer_view_and_send_unlocked(
peer: PeerId,
peer_data: &mut PeerData,
@@ -777,6 +792,7 @@ async fn update_peer_view_and_send_unlocked(
Ok(())
}
#[tracing::instrument(level = "trace", skip(peers, active_heads, ctx, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
peers: &mut HashMap<PeerId, PeerData>,
active_heads: &mut HashMap<Hash, ActiveHeadData>,
@@ -847,9 +863,13 @@ async fn handle_network_update(
for new in our_view.difference(&old_view) {
if !active_heads.contains_key(&new) {
log::warn!(target: LOG_TARGET, "Our network bridge view update \
tracing::warn!(
target: LOG_TARGET,
unknown_hash = %new,
"Our network bridge view update \
inconsistent with `StartWork` messages we have received from overseer. \
Contains unknown hash {}", new);
Contains unknown hash.",
);
}
}
@@ -860,6 +880,7 @@ async fn handle_network_update(
}
impl StatementDistribution {
#[tracing::instrument(skip(self, ctx), fields(subsystem = LOG_TARGET))]
async fn run(
self,
mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>,
@@ -899,10 +920,10 @@ impl StatementDistribution {
match (val_rx.await?, session_rx.await?) {
(Ok(v), Ok(s)) => (v, s),
(Err(e), _) | (_, Err(e)) => {
log::warn!(
tracing::warn!(
target: LOG_TARGET,
"Failed to fetch runtime API data for active leaf: {:?}",
e,
err = ?e,
"Failed to fetch runtime API data for active leaf",
);
// Lacking this bookkeeping might make us behave funny, although
+2 -2
View File
@@ -6,7 +6,8 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../primitives" }
@@ -22,5 +23,4 @@ polkadot-node-network-protocol = { path = "../network/protocol" }
futures = { version = "0.3.8", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.1.1"
log = "0.4.11"
kv-log-macro = "1.0.7"
@@ -25,7 +25,6 @@ use futures::{
FutureExt, StreamExt,
};
use futures_timer::Delay;
use kv_log_macro as log;
use polkadot_primitives::v1::{BlockData, PoV};
use polkadot_overseer::{Overseer, AllSubsystems};
@@ -43,13 +42,13 @@ impl Subsystem1 {
match ctx.try_recv().await {
Ok(Some(msg)) => {
if let FromOverseer::Communication { msg } = msg {
log::info!("msg {:?}", msg);
tracing::info!("msg {:?}", msg);
}
continue;
}
Ok(None) => (),
Err(_) => {
log::info!("exiting");
tracing::info!("exiting");
return;
}
}
@@ -94,7 +93,7 @@ impl Subsystem2 {
"subsystem-2-job",
Box::pin(async {
loop {
log::info!("Job tick");
tracing::info!("Job tick");
Delay::new(Duration::from_secs(1)).await;
}
}),
@@ -103,12 +102,12 @@ impl Subsystem2 {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
log::info!("Subsystem2 received message {:?}", msg);
tracing::info!("Subsystem2 received message {:?}", msg);
continue;
}
Ok(None) => { pending!(); }
Err(_) => {
log::info!("exiting");
tracing::info!("exiting");
return;
},
}
@@ -159,7 +158,7 @@ fn main() {
select! {
_ = overseer_fut => break,
_ = timer_stream.next() => {
log::info!("tick");
tracing::info!("tick");
}
complete => break,
}
+19 -4
View File
@@ -135,6 +135,7 @@ enum ToOverseer {
/// This structure exists solely for the purposes of decoupling
/// `Overseer` code from the client code and the necessity to call
/// `HeaderBackend::block_number_from_id()`.
#[derive(Debug)]
pub struct BlockInfo {
/// hash of the block.
pub hash: Hash,
@@ -191,16 +192,19 @@ pub struct OverseerHandler {
impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImported(block)).await.map_err(Into::into)
}
/// Send some message to one of the `Subsystem`s.
#[tracing::instrument(level = "trace", skip(self, msg), fields(subsystem = LOG_TARGET))]
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem(msg.into())).await.map_err(Into::into)
}
/// Inform the `Overseer` that that some block was finalized.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized(block)).await.map_err(Into::into)
}
@@ -212,6 +216,7 @@ impl OverseerHandler {
/// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas,
/// the response channel may never return if the hash was deactivated before this call.
/// In this case, it's the caller's responsibility to ensure a timeout is set.
#[tracing::instrument(level = "trace", skip(self, response_channel), fields(subsystem = LOG_TARGET))]
pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<SubsystemResult<()>>) -> SubsystemResult<()> {
self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation {
hash,
@@ -220,6 +225,7 @@ impl OverseerHandler {
}
/// Tell `Overseer` to shutdown.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await.map_err(Into::into)
}
@@ -1287,6 +1293,7 @@ where
}
/// Run the `Overseer`.
#[tracing::instrument(skip(self), fields(subsystem = LOG_TARGET))]
pub async fn run(mut self) -> SubsystemResult<()> {
let leaves = std::mem::take(&mut self.leaves);
let mut update = ActiveLeavesUpdate::default();
@@ -1337,7 +1344,7 @@ where
// Some subsystem exited? It's time to panic.
if let Poll::Ready(Some(finished)) = poll!(self.running_subsystems.next()) {
log::error!(target: LOG_TARGET, "Subsystem finished unexpectedly {:?}", finished);
tracing::error!(target: LOG_TARGET, subsystem = ?finished, "subsystem finished unexpectedly");
self.stop().await;
return finished;
}
@@ -1347,6 +1354,7 @@ where
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
@@ -1376,6 +1384,7 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default();
@@ -1399,6 +1408,7 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
if let Some(ref mut s) = self.candidate_validation_subsystem.instance {
s.tx.send(FromOverseer::Signal(signal.clone())).await?;
@@ -1463,6 +1473,7 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn route_message(&mut self, msg: AllMessages) {
self.metrics.on_message_relayed();
match msg {
@@ -1544,6 +1555,7 @@ where
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn on_head_activated(&mut self, hash: &Hash) {
self.metrics.on_head_activated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
@@ -1554,6 +1566,7 @@ where
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn on_head_deactivated(&mut self, hash: &Hash) {
self.metrics.on_head_deactivated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
@@ -1562,6 +1575,7 @@ where
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn clean_up_external_listeners(&mut self) {
self.activation_external_listeners.retain(|_, v| {
// remove dead listeners
@@ -1570,6 +1584,7 @@ where
})
}
#[tracing::instrument(level = "trace", skip(self, request), fields(subsystem = LOG_TARGET))]
fn handle_external_request(&mut self, request: ExternalRequest) {
match request {
ExternalRequest::WaitForActivation { hash, response_channel } => {
@@ -1607,9 +1622,9 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
let fut = Box::pin(async move {
if let Err(e) = future.await {
log::error!("Subsystem {} exited with error {:?}", name, e);
tracing::error!(subsystem=name, err = ?e, "subsystem exited with error");
} else {
log::debug!("Subsystem {} exited without an error", name);
tracing::debug!(subsystem=name, "subsystem exited without an error");
}
let _ = tx.send(());
});
@@ -1617,7 +1632,7 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
spawner.spawn(name, fut);
let _ = streams.push(from_rx);
futures.push(Box::pin(rx.map(|e| { log::warn!("Dropping error {:?}", e); Ok(()) })));
futures.push(Box::pin(rx.map(|e| { tracing::warn!(err = ?e, "dropping error"); Ok(()) })));
let instance = Some(SubsystemInstance {
tx: to_tx,
+2 -1
View File
@@ -55,7 +55,8 @@ prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https:
futures = "0.3.8"
hex-literal = "0.3.1"
lazy_static = "1.4.0"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
parking_lot = "0.11.1"
serde = { version = "1.0.117", features = ["derive"] }
slog = "2.5.2"
+5 -2
View File
@@ -28,7 +28,7 @@ use {
std::convert::TryInto,
std::time::Duration,
log::info,
tracing::info,
polkadot_node_core_av_store::Config as AvailabilityConfig,
polkadot_node_core_proposer::ProposerFactory,
polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandler},
@@ -704,7 +704,10 @@ pub fn new_full<RuntimeApi, Executor>(
// given delay.
let voting_rule = match grandpa_pause {
Some((block, delay)) => {
info!("GRANDPA scheduled voting pause set for block #{} with a duration of {} blocks.",
info!(
block_number = %block,
delay = %delay,
"GRANDPA scheduled voting pause set for block #{} with a duration of {} blocks.",
block,
delay,
);
@@ -9,7 +9,8 @@ description = "Subsystem traits and message definitions"
async-trait = "0.1.41"
futures = "0.3.8"
futures-timer = "3.0.2"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
parking_lot = "0.11.1"
pin-project = "1.0.2"
+5 -3
View File
@@ -9,12 +9,13 @@ description = "Subsystem traits and message definitions"
async-trait = "0.1.41"
futures = "0.3.8"
futures-timer = "3.0.2"
log = "0.4.11"
thiserror = "1.0.22"
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
parking_lot = { version = "0.11.1", optional = true }
pin-project = "1.0.2"
streamunordered = "0.5.1"
thiserror = "1.0.22"
tracing = "0.1.21"
tracing-futures = "0.2.4"
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
@@ -29,7 +30,8 @@ substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate
[dev-dependencies]
assert_matches = "1.4.0"
async-trait = "0.1.41"
env_logger = "0.8.2"
futures = { version = "0.3.8", features = ["thread-pool"] }
log = "0.4.11"
parking_lot = "0.11.1"
polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
env_logger = "0.8.2"
+27 -14
View File
@@ -579,11 +579,11 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
let (future, abort_handle) = future::abortable(async move {
if let Err(e) = Job::run(parent_hash, run_args, metrics, to_job_rx, from_job_tx).await {
log::error!(
"{}({}) finished with an error {:?}",
Job::NAME,
parent_hash,
e,
tracing::error!(
job = Job::NAME,
parent_hash = %parent_hash,
err = ?e,
"job finished with an error",
);
if let Some(mut err_tx) = err_tx {
@@ -591,7 +591,7 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
// there's no point trying to propagate this error onto the channel too
// all we can do is warn that error propagation has failed
if let Err(e) = err_tx.send((Some(parent_hash), JobsError::Job(e))).await {
log::warn!("failed to forward error: {:?}", e);
tracing::warn!(err = ?e, "failed to forward error");
}
}
}
@@ -632,7 +632,7 @@ impl<Spawner: SpawnNamed, Job: 'static + JobTrait> Jobs<Spawner, Job> {
async fn send_msg(&mut self, parent_hash: Hash, msg: Job::ToJob) {
if let Entry::Occupied(mut job) = self.running.entry(parent_hash) {
if job.get_mut().send_msg(msg).await.is_err() {
log::debug!("failed to send message to job ({}), will remove it", Job::NAME);
tracing::debug!(job = Job::NAME, "failed to send message to job, will remove it");
job.remove();
}
}
@@ -767,7 +767,7 @@ where
// if we can't send on the error transmission channel, we can't do anything useful about it
// still, we can at least log the failure
if let Err(e) = err_tx.send((hash, err)).await {
log::warn!("failed to forward error: {:?}", e);
tracing::warn!(err = ?e, "failed to forward error");
}
}
}
@@ -792,7 +792,11 @@ where
for hash in activated {
let metrics = metrics.clone();
if let Err(e) = jobs.spawn_job(hash, run_args.clone(), metrics) {
log::error!("Failed to spawn a job({}): {:?}", Job::NAME, e);
tracing::error!(
job = Job::NAME,
err = ?e,
"failed to spawn a job",
);
Self::fwd_err(Some(hash), JobsError::Utility(e), err_tx).await;
return true;
}
@@ -821,7 +825,11 @@ where
.forward(drain())
.await
{
log::error!("failed to stop all jobs ({}) on conclude signal: {:?}", Job::NAME, e);
tracing::error!(
job = Job::NAME,
err = ?e,
"failed to stop a job on conclude signal",
);
let e = Error::from(e);
Self::fwd_err(None, JobsError::Utility(e), err_tx).await;
}
@@ -832,16 +840,20 @@ where
if let Ok(to_job) = <Job::ToJob>::try_from(msg) {
match to_job.relay_parent() {
Some(hash) => jobs.send_msg(hash, to_job).await,
None => log::debug!(
"Trying to send a message to a job ({}) without specifying a relay parent.",
Job::NAME,
None => tracing::debug!(
job = Job::NAME,
"trying to send a message to a job without specifying a relay parent",
),
}
}
}
Ok(Signal(BlockFinalized(_))) => {}
Err(err) => {
log::error!("error receiving message from subsystem context for job ({}): {:?}", Job::NAME, err);
tracing::error!(
job = Job::NAME,
err = ?err,
"error receiving message from subsystem context for job",
);
Self::fwd_err(None, JobsError::Utility(Error::from(err)), err_tx).await;
return true;
}
@@ -956,6 +968,7 @@ macro_rules! delegated_subsystem {
}
/// Run this subsystem
#[tracing::instrument(skip(ctx, run_args, metrics, spawner), fields(subsystem = $subsystem_name))]
pub async fn run(ctx: Context, run_args: $run_args, metrics: $metrics, spawner: Spawner) {
<Manager<Spawner, Context>>::run(ctx, run_args, metrics, spawner, None).await
}
@@ -218,7 +218,7 @@ impl ConnectionRequest {
/// This can be done either by calling this function or dropping the request.
pub fn revoke(self) {
if let Err(_) = self.revoke.send(()) {
log::warn!(
tracing::warn!(
"Failed to revoke a validator connection request",
);
}
+2 -1
View File
@@ -10,7 +10,8 @@ async-trait = "0.1.41"
derive_more = "0.99.11"
futures = "0.3.8"
futures-timer = "3.0.2"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
parking_lot = { version = "0.11.1", optional = true }
pin-project = "1.0.2"
+1 -1
View File
@@ -239,7 +239,7 @@ where
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
log::debug!(
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
+2 -1
View File
@@ -8,7 +8,8 @@ edition = "2018"
futures = "0.3.8"
futures01 = { package = "futures", version = "0.1.29" }
hex = "0.4.2"
log = "0.4.11"
tracing = "0.1.21"
tracing-futures = "0.2.4"
rand = "0.7.3"
tempfile = "3.1.0"
+8 -8
View File
@@ -469,8 +469,8 @@ impl From<u32> for CoreIndex {
}
/// The unique (during session) index of a validator group.
#[derive(Encode, Decode, Default, Clone, Copy)]
#[cfg_attr(feature = "std", derive(Eq, Hash, PartialEq, Debug))]
#[derive(Encode, Decode, Default, Clone, Copy, Debug)]
#[cfg_attr(feature = "std", derive(Eq, Hash, PartialEq))]
pub struct GroupIndex(pub u32);
impl From<u32> for GroupIndex {
@@ -571,8 +571,8 @@ impl<N: Saturating + BaseArithmetic + Copy> GroupRotationInfo<N> {
}
/// Information about a core which is currently occupied.
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq, Debug))]
#[derive(Clone, Encode, Decode, Debug)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub struct OccupiedCore<N = BlockNumber> {
/// The ID of the para occupying the core.
pub para_id: Id,
@@ -596,8 +596,8 @@ pub struct OccupiedCore<N = BlockNumber> {
}
/// Information about a core which is currently occupied.
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq, Debug, Default))]
#[derive(Clone, Encode, Decode, Debug)]
#[cfg_attr(feature = "std", derive(PartialEq, Default))]
pub struct ScheduledCore {
/// The ID of a para scheduled.
pub para_id: Id,
@@ -606,8 +606,8 @@ pub struct ScheduledCore {
}
/// The state of a particular availability core.
#[derive(Clone, Encode, Decode)]
#[cfg_attr(feature = "std", derive(PartialEq, Debug))]
#[derive(Clone, Encode, Decode, Debug)]
#[cfg_attr(feature = "std", derive(PartialEq))]
pub enum CoreState<N = BlockNumber> {
/// The core is currently occupied.
#[codec(index = "0")]