diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 9bbe334f7e..a892de4394 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -266,16 +266,34 @@ dependencies = [ ] [[package]] -name = "async-std" -version = "1.6.5" +name = "async-process" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9fa76751505e8df1c7a77762f60486f60c71bbd9b8557f4da6ad47d083732ed" +checksum = "4c8cea09c1fb10a317d1b5af8024eeba256d6554763e85ecd90ff8df31c7bbda" dependencies = [ + "async-io", + "blocking", + "cfg-if 0.1.10", + "event-listener", + "futures-lite", + "once_cell", + "signal-hook", + "winapi 0.3.9", +] + +[[package]] +name = "async-std" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f9f84f1280a2b436a2c77c2582602732b6c2f4321d5494d6e799e6c367859a8" +dependencies = [ + "async-channel", "async-global-executor", "async-io", "async-mutex", + "async-process", "blocking", - "crossbeam-utils", + "crossbeam-utils 0.8.1", "futures-channel", "futures-core", "futures-io", @@ -286,7 +304,7 @@ dependencies = [ "memchr", "num_cpus", "once_cell", - "pin-project-lite 0.1.7", + "pin-project-lite 0.2.0", "pin-utils", "slab", "wasm-bindgen-futures", @@ -936,7 +954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285" dependencies = [ "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -948,7 +966,7 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg 1.0.0", "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "maybe-uninit", "memoffset", @@ -962,7 +980,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -977,6 +995,17 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02d96d1e189ef58269ebe5b97953da3274d83a93af647c2ddd6f9dab28cedb8d" +dependencies = [ + "autocfg 1.0.0", + "cfg-if 1.0.0", + "lazy_static", +] + [[package]] name = "crunchy" version = "0.2.2" @@ -1465,6 +1494,16 @@ dependencies = [ "parity-scale-codec", ] +[[package]] +name = "form_urlencoded" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ece68d15c92e84fa4f19d3780f1294e5ca82a78a6d515f1efaabcc144688be00" +dependencies = [ + "matches", + "percent-encoding 2.1.0", +] + [[package]] name = "frame-benchmarking" version = "2.0.0" @@ -2369,6 +2408,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b141fdc7836c525d4d594027d318c84161ca17aaf8113ab1f81ab93ae897485" +[[package]] +name = "integer-encoding" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4ebd0bd29be0f11973e9b3e219005661042a019fd757798c36a47c87852625" + [[package]] name = "integer-sqrt" version = "0.1.3" @@ -3178,7 +3223,7 @@ dependencies = [ "rustls 0.19.0", "rw-stream-sink", "soketto", - "url 2.1.1", + "url 2.2.0", "webpki", "webpki-roots", ] @@ -3424,6 +3469,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "mick-jaeger" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68c751e6643309568aa78a725b75755c11d866d6d0d0f7209033142007971cdd" +dependencies = [ + "futures 0.3.8", + "rand 0.7.3", + "thrift", +] + [[package]] name = "minicbor" version = "0.7.0" @@ -3794,6 +3850,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" +[[package]] +name = "ordered-float" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3741934be594d77de1c8461ebcbbe866f585ea616a9753aa78f2bdc69f0e4579" +dependencies = [ + "num-traits 0.2.12", +] + [[package]] name = "output_vt100" version = "0.1.2" @@ -4385,7 +4450,7 @@ dependencies = [ "serde", "static_assertions", "unsigned-varint", - "url 2.1.1", + "url 2.2.0", ] [[package]] @@ -4496,7 +4561,7 @@ dependencies = [ "rand 0.7.3", "sha-1", "slab", - "url 2.1.1", + "url 2.2.0", ] [[package]] @@ -5133,10 +5198,14 @@ name = "polkadot-node-subsystem" version = "0.1.0" dependencies = [ "assert_matches", + "async-std", "async-trait", "derive_more", "futures 0.3.8", "futures-timer 3.0.2", + "lazy_static", + "log", + "mick-jaeger", "parity-scale-codec", "parking_lot 0.11.1", "pin-project 1.0.2", @@ -6269,7 +6338,7 @@ checksum = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280" dependencies = [ "crossbeam-deque", "crossbeam-queue", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "num_cpus", ] @@ -6487,7 +6556,7 @@ dependencies = [ "base64 0.11.0", "blake2b_simd", "constant_time_eq", - "crossbeam-utils", + "crossbeam-utils 0.7.2", ] [[package]] @@ -7829,6 +7898,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" +[[package]] +name = "signal-hook" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604508c1418b99dfe1925ca9224829bb2a8a9a04dda655cc01fcad46f4ab05ed" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.2.0" @@ -9056,6 +9135,19 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "thrift" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float", + "threadpool", +] + [[package]] name = "time" version = "0.1.43" @@ -9184,7 +9276,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.7.2", "futures 0.1.29", ] @@ -9240,7 +9332,7 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.7.2", "futures 0.1.29", "lazy_static", "log", @@ -9306,7 +9398,7 @@ checksum = "df720b6581784c118f0eb4310796b12b1d242a7eb95f716a8367855325c25f89" dependencies = [ "crossbeam-deque", "crossbeam-queue", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "futures 0.1.29", "lazy_static", "log", @@ -9321,7 +9413,7 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.7.2", "futures 0.1.29", "slab", "tokio-executor", @@ -9643,10 +9735,11 @@ dependencies = [ [[package]] name = "url" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" +checksum = "5909f2b0817350449ed73e8bcd81c8c3c8d9a7a5d8acba4b27db277f1868976e" dependencies = [ + "form_urlencoded", "idna 0.2.0", "matches", "percent-encoding 2.1.0", diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index bf9c7a242b..8c4bfb1e74 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -91,6 +91,13 @@ pub struct RunCmd { /// elapsed (i.e. until a block at height `pause_block + delay` is imported). #[structopt(long = "grandpa-pause", number_of_values(2))] pub grandpa_pause: Vec, + + /// Add the destination address to the jaeger agent. + /// + /// Must be valid socket address, of format `IP:Port` + /// commonly `127.0.0.1:6831`. + #[structopt(long)] + pub jaeger_agent: Option, } #[allow(missing_docs)] @@ -98,7 +105,6 @@ pub struct RunCmd { pub struct Cli { #[structopt(subcommand)] pub subcommand: Option, - #[structopt(flatten)] pub run: RunCmd, } diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index d09de02a12..a81755634a 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -162,17 +162,19 @@ pub fn run() -> Result<()> { info!("----------------------------"); } + let jaeger_agent = cli.run.jaeger_agent; Ok(runner.run_node_until_exit(move |config| async move { let role = config.role.clone(); let task_manager = match role { Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager) - .map_err(|e| sc_service::Error::Other(e.to_string()) ), + .map_err(|e| sc_service::Error::Other(e.to_string())), _ => service::build_full( config, service::IsCollator::No, grandpa_pause, + jaeger_agent, ).map(|full| full.task_manager) .map_err(|e| sc_service::Error::Other(e.to_string()) ) }; @@ -191,7 +193,7 @@ pub fn run() -> Result<()> { set_default_ss58_version(chain_spec); runner.async_run(|mut config| { - let (client, _, import_queue, task_manager) = service::new_chain_ops(&mut config) + let (client, _, import_queue, task_manager) = service::new_chain_ops(&mut config, None) .map_err(|e| sc_service::Error::Other(e.to_string()))?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -203,7 +205,7 @@ pub fn run() -> Result<()> { set_default_ss58_version(chain_spec); runner.async_run(|mut config| { - let (client, _, _, task_manager) = service::new_chain_ops(&mut config) + let (client, _, _, task_manager) = service::new_chain_ops(&mut config, None) .map_err(|e| sc_service::Error::Other(e.to_string()))?; Ok((cmd.run(client, config.database), task_manager)) }) @@ -215,7 +217,7 @@ pub fn run() -> Result<()> { set_default_ss58_version(chain_spec); runner.async_run(|mut config| { - let (client, _, _, task_manager) = service::new_chain_ops(&mut config) + let (client, _, _, task_manager) = service::new_chain_ops(&mut config, None) .map_err(|e| sc_service::Error::Other(e.to_string()))?; Ok((cmd.run(client, config.chain_spec), task_manager)) }) @@ -227,7 +229,7 @@ pub fn run() -> Result<()> { set_default_ss58_version(chain_spec); runner.async_run(|mut config| { - let (client, _, import_queue, task_manager) = service::new_chain_ops(&mut config) + let (client, _, import_queue, task_manager) = service::new_chain_ops(&mut config, None) .map_err(|e| sc_service::Error::Other(e.to_string()))?; Ok((cmd.run(client, import_queue), task_manager)) }) @@ -244,7 +246,7 @@ pub fn run() -> Result<()> { set_default_ss58_version(chain_spec); runner.async_run(|mut config| { - let (client, backend, _, task_manager) = service::new_chain_ops(&mut config) + let (client, backend, _, task_manager) = service::new_chain_ops(&mut config, None) .map_err(|e| sc_service::Error::Other(e.to_string()))?; Ok((cmd.run(client, backend), task_manager)) }) diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 54dbe295f7..492e57dc4d 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -37,6 +37,7 @@ use polkadot_node_primitives::{ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult, }; use polkadot_subsystem::{ + jaeger, messages::{ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage, CandidateValidationMessage, PoVDistributionMessage, ProvisionableData, @@ -457,10 +458,12 @@ impl CandidateBackingJob { async fn run_loop( mut self, mut rx_to: mpsc::Receiver, + span: &jaeger::JaegerSpan ) -> Result<(), Error> { loop { futures::select! { validated_command = self.background_validation.next() => { + let _span = span.child("background validation"); if let Some(c) = validated_command { self.handle_validated_candidate_command(c).await?; } else { @@ -470,6 +473,7 @@ impl CandidateBackingJob { to_job = rx_to.next() => match to_job { None => break, Some(msg) => { + let _span = span.child("process message"); self.process_msg(msg).await?; } } @@ -870,6 +874,9 @@ impl util::JobTrait for CandidateBackingJob { } } + let span = jaeger::hash_span(&parent, "run:backing"); + let _span = span.child("runtime apis"); + let (validators, groups, session_index, cores) = futures::try_join!( try_runtime_api!(request_validators(parent, &mut tx_from).await), try_runtime_api!(request_validator_groups(parent, &mut tx_from).await), @@ -886,6 +893,9 @@ impl util::JobTrait for CandidateBackingJob { let session_index = try_runtime_api!(session_index); let cores = try_runtime_api!(cores); + drop(_span); + let _span = span.child("validator construction"); + let signing_context = SigningContext { parent_hash: parent, session_index }; let validator = match Validator::construct( &validators, @@ -905,6 +915,10 @@ impl util::JobTrait for CandidateBackingJob { } }; + drop(_span); + let _span = span.child("calc validator groups"); + + let mut groups = HashMap::new(); let n_cores = cores.len(); @@ -936,6 +950,9 @@ impl util::JobTrait for CandidateBackingJob { Some((assignment, required_collator)) => (Some(assignment), required_collator), }; + drop(_span); + let _span = span.child("wait for candidate backing job"); + let (background_tx, background_rx) = mpsc::channel(16); let job = CandidateBackingJob { parent, @@ -954,10 +971,10 @@ impl util::JobTrait for CandidateBackingJob { background_validation_tx: background_tx, metrics, }; + drop(_span); - job.run_loop(rx_to).await - } - .boxed() + job.run_loop(rx_to, &span).await + }.boxed() } } diff --git a/polkadot/node/core/bitfield-signing/src/lib.rs b/polkadot/node/core/bitfield-signing/src/lib.rs index 92478afb13..e02bd5661f 100644 --- a/polkadot/node/core/bitfield-signing/src/lib.rs +++ b/polkadot/node/core/bitfield-signing/src/lib.rs @@ -23,6 +23,7 @@ use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future}; use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr}; use polkadot_node_subsystem::{ + jaeger, messages::{ AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest, @@ -75,7 +76,9 @@ async fn get_core_availability( validator_idx: ValidatorIndex, sender: &Mutex<&mut mpsc::Sender>, ) -> Result { + let span = jaeger::hash_span(&relay_parent, "core_availability"); if let CoreState::Occupied(core) = core { + let _span = span.child("occupied"); let (tx, rx) = oneshot::channel(); sender .lock() @@ -97,6 +100,10 @@ async fn get_core_availability( return Ok(false); } }; + + drop(_span); + let _span = span.child("query chunk"); + let (tx, rx) = oneshot::channel(); sender .lock() @@ -120,6 +127,7 @@ async fn get_availability_cores( relay_parent: Hash, sender: &mut mpsc::Sender, ) -> Result, Error> { + let _span = jaeger::hash_span(&relay_parent, "get availability cores"); let (tx, rx) = oneshot::channel(); sender .send(AllMessages::from(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::AvailabilityCores(tx))).into()) @@ -226,6 +234,8 @@ impl JobTrait for BitfieldSigningJob { ) -> Pin> + Send>> { let metrics = metrics.clone(); async move { + let span = jaeger::hash_span(&relay_parent, "run:bitfield-signing"); + let _span = span.child("delay"); let wait_until = Instant::now() + JOB_DELAY; // now do all the work we can before we need to wait for the availability store @@ -243,6 +253,9 @@ impl JobTrait for BitfieldSigningJob { // JOB_DELAY each time. let _timer = metrics.time_run(); + drop(_span); + let _span = span.child("availablity"); + let bitfield = match construct_availability_bitfield(relay_parent, validator.index(), &mut sender).await { @@ -255,12 +268,18 @@ impl JobTrait for BitfieldSigningJob { Ok(bitfield) => bitfield, }; + drop(_span); + let _span = span.child("signing"); + let signed_bitfield = validator .sign(keystore.clone(), bitfield) .await .map_err(|e| Error::Keystore(e))?; metrics.on_bitfield_signed(); + drop(_span); + let _span = span.child("gossip"); + sender .send( AllMessages::from( diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 5812a47f15..3958ffe280 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -25,6 +25,7 @@ use futures::{ }; use sp_keystore::SyncCryptoStorePtr; use polkadot_node_subsystem::{ + jaeger, errors::ChainApiError, messages::{ AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage, @@ -99,7 +100,9 @@ impl JobTrait for CandidateSelectionJob { receiver: mpsc::Receiver, mut sender: mpsc::Sender, ) -> Pin> + Send>> { + let span = jaeger::hash_span(&relay_parent, "candidate-selection:run"); async move { + let _span = span.child("query runtime"); let (groups, cores) = futures::try_join!( try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await), try_runtime_api!(request_from_runtime( @@ -112,6 +115,9 @@ impl JobTrait for CandidateSelectionJob { let (validator_groups, group_rotation_info) = try_runtime_api!(groups); let cores = try_runtime_api!(cores); + drop(_span); + let _span = span.child("find assignment"); + let n_cores = cores.len(); let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await { @@ -141,7 +147,9 @@ impl JobTrait for CandidateSelectionJob { None => return Ok(()), }; - CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop().await + drop(_span); + + CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop(&span).await }.boxed() } } @@ -162,7 +170,8 @@ impl CandidateSelectionJob { } } - async fn run_loop(&mut self) -> Result<(), Error> { + async fn run_loop(&mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> { + let span = span.child("run loop"); loop { match self.receiver.next().await { Some(CandidateSelectionMessage::Collation( @@ -170,12 +179,14 @@ impl CandidateSelectionJob { para_id, collator_id, )) => { + let _span = span.child("handle collation"); self.handle_collation(relay_parent, para_id, collator_id).await; } Some(CandidateSelectionMessage::Invalid( _, candidate_receipt, )) => { + let _span = span.child("handle invalid"); self.handle_invalid(candidate_receipt).await; } None => break, @@ -459,10 +470,10 @@ mod tests { }; preconditions(&mut job); - + let span = jaeger::JaegerSpan::Disabled; let (_, job_result) = futures::executor::block_on(future::join( test(to_job_tx, from_job_rx), - job.run_loop(), + job.run_loop(&span), )); postconditions(job, job_result); diff --git a/polkadot/node/core/proposer/src/lib.rs b/polkadot/node/core/proposer/src/lib.rs index e5b4c61341..63b3859422 100644 --- a/polkadot/node/core/proposer/src/lib.rs +++ b/polkadot/node/core/proposer/src/lib.rs @@ -20,7 +20,10 @@ use futures::prelude::*; use futures::select; -use polkadot_node_subsystem::{messages::{AllMessages, ProvisionerInherentData, ProvisionerMessage}, SubsystemError}; +use polkadot_node_subsystem::{ + jaeger, + messages::{AllMessages, ProvisionerInherentData, ProvisionerMessage}, SubsystemError, +}; use polkadot_overseer::OverseerHandler; use polkadot_primitives::v1::{ Block, Hash, Header, @@ -193,6 +196,9 @@ where record_proof: RecordProof, ) -> Self::Proposal { async move { + let span = jaeger::hash_span(&self.parent_header_hash, "propose"); + let _span = span.child("get provisioner"); + let provisioner_data = match self.get_provisioner_data().await { Ok(pd) => pd, Err(err) => { @@ -201,11 +207,14 @@ where } }; + drop(_span); + inherent_data.put_data( polkadot_primitives::v1::INCLUSION_INHERENT_IDENTIFIER, &provisioner_data, )?; + let _span = span.child("authorship propose"); self.inner .propose(inherent_data, inherent_digests, max_duration, record_proof) .await diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 4e985a2622..fe89a6136c 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -26,6 +26,7 @@ use futures::{ }; use polkadot_node_subsystem::{ errors::{ChainApiError, RuntimeApiError}, + jaeger, messages::{ AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, @@ -154,10 +155,12 @@ impl JobTrait for ProvisioningJob { sender, receiver, ); + + let span = jaeger::hash_span(&relay_parent, "provisioner"); // it isn't necessary to break run_loop into its own function, // but it's convenient to separate the concerns in this way - job.run_loop().await + job.run_loop(&span).await } .boxed() } @@ -183,15 +186,15 @@ impl ProvisioningJob { } } - async fn run_loop(mut self) -> Result<(), Error> { + async fn run_loop(mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> { use ProvisionerMessage::{ ProvisionableData, RequestBlockAuthorshipData, RequestInherentData, }; - loop { futures::select! { msg = self.receiver.next().fuse() => match msg { Some(RequestInherentData(_, return_sender)) => { + let _span = span.child("req inherent data"); let _timer = self.metrics.time_request_inherent_data(); if self.inherent_after.is_ready() { @@ -201,9 +204,11 @@ impl ProvisioningJob { } } Some(RequestBlockAuthorshipData(_, sender)) => { + let _span = span.child("req block authorship"); self.provisionable_data_channels.push(sender) } Some(ProvisionableData(_, data)) => { + let _span = span.child("provisionable data"); let _timer = self.metrics.time_provisionable_data(); let mut bad_indices = Vec::new(); @@ -241,6 +246,7 @@ impl ProvisioningJob { None => break, }, _ = self.inherent_after.ready().fuse() => { + let _span = span.child("send inherent data"); let return_senders = std::mem::take(&mut self.awaiting_inherent); if !return_senders.is_empty() { self.send_inherent_data(return_senders).await; diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 4ca045a7bc..203e7ef260 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -44,6 +44,7 @@ use polkadot_subsystem::messages::{ NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, }; use polkadot_subsystem::{ + jaeger, errors::{ChainApiError, RuntimeApiError}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, @@ -354,6 +355,8 @@ where } }; + let mut _span = jaeger::hash_span(&gossiped_availability.candidate_hash.0, "availability-message-received"); + process_incoming_peer_message(ctx, state, remote, gossiped_availability, metrics) .await?; } diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 2d1313c58e..ccf5e26f43 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -27,6 +27,7 @@ use futures::{channel::oneshot, FutureExt}; use polkadot_subsystem::messages::*; use polkadot_subsystem::{ + jaeger, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; @@ -180,7 +181,9 @@ impl BitfieldDistribution { for relay_parent in activated { tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated"); - // query basic system parameters once + let _span = jaeger::hash_span(&relay_parent, "bitfield-dist:active_leaves:basics"); + + // query validator set and signing context per relay_parent once only match query_basics(&mut ctx, relay_parent).await { Ok(Some((validator_set, signing_context))) => { // If our runtime API fails, we don't take down the node, @@ -232,6 +235,7 @@ where Context: SubsystemContext, { tracing::trace!(target: LOG_TARGET, rep = ?rep, peer_id = %peer, "reputation change"); + ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::ReportPeer(peer, rep), )) @@ -306,6 +310,9 @@ async fn relay_message( where Context: SubsystemContext, { + let span = jaeger::hash_span(&message.relay_parent, "relay_msg"); + + let _span = span.child("provisionable"); // notify the overseer about a new and valid signed bitfield ctx.send_message(AllMessages::Provisioner( ProvisionerMessage::ProvisionableData( @@ -318,6 +325,9 @@ where )) .await; + drop(_span); + + let _span = span.child("interested peers"); // pass on the bitfield distribution to all interested peers let interested_peers = peer_views .iter() @@ -341,6 +351,7 @@ where } }) .collect::>(); + drop(_span); if interested_peers.is_empty() { tracing::trace!( @@ -349,6 +360,7 @@ where "no peers are interested in gossip for relay parent", ); } else { + let _span = span.child("gossip"); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendValidationMessage( interested_peers, @@ -483,6 +495,8 @@ where NetworkBridgeEvent::PeerMessage(remote, message) => { match message { protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => { + let mut _span = jaeger::hash_span(&relay_parent, "bitfield-gossip-received"); + _span.add_string_tag("peer-id", &remote.to_base58()); tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer"); let gossiped_bitfield = BitfieldGossipMessage { relay_parent, @@ -581,6 +595,8 @@ where return; }; + let _span = jaeger::hash_span(&message.relay_parent, "gossip"); + job_data.message_sent_to_peer .entry(dest.clone()) .or_default() diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 294be06190..60208ec8c6 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -24,6 +24,7 @@ use polkadot_primitives::v1::{ CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, }; use polkadot_subsystem::{ + jaeger, FromOverseer, OverseerSignal, SubsystemContext, messages::{ AllMessages, CollatorProtocolMessage, @@ -430,6 +431,8 @@ async fn process_msg( state.collating_on = Some(id); } DistributeCollation(receipt, pov) => { + let _span1 = jaeger::hash_span(&receipt.descriptor.relay_parent, "distributing-collation"); + let _span2 = jaeger::pov_span(&pov, "distributing-collation"); match state.collating_on { Some(id) if receipt.descriptor.para_id != id => { // If the ParaId of a collation requested to be distributed does not match @@ -539,10 +542,12 @@ async fn handle_incoming_peer_message( ); } RequestCollation(request_id, relay_parent, para_id) => { + let _span = jaeger::hash_span(&relay_parent, "rx-collation-request"); match state.collating_on { Some(our_para_id) => { if our_para_id == para_id { if let Some(collation) = state.collations.get(&relay_parent).cloned() { + let _span = _span.child("sending"); send_collation(ctx, state, request_id, origin, collation.0, collation.1).await; } } else { diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index 3af5aba5e4..a865dc75ee 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -30,6 +30,7 @@ use polkadot_primitives::v1::{ Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV, }; use polkadot_subsystem::{ + jaeger, FromOverseer, OverseerSignal, SubsystemContext, messages::{ AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage, @@ -504,6 +505,7 @@ where state.peer_views.entry(origin).or_default(); } AdvertiseCollation(relay_parent, para_id) => { + let _span = jaeger::hash_span(&relay_parent, "advertising-collation"); state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)); if let Some(collator) = state.known_collators.get(&origin) { @@ -515,6 +517,8 @@ where modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; } Collation(request_id, receipt, pov) => { + let _span1 = jaeger::hash_span(&receipt.descriptor.relay_parent, "received-collation"); + let _span2 = jaeger::pov_span(&pov, "received-collation"); received_collation(ctx, state, origin, request_id, receipt, pov).await; } } @@ -659,6 +663,7 @@ where ); } FetchCollation(relay_parent, collator_id, para_id, tx) => { + let _span = jaeger::hash_span(&relay_parent, "fetching-collation"); fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await; } ReportCollator(id) => { diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 5e6a58f948..ac2adf094d 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -23,6 +23,7 @@ #![warn(missing_docs)] use polkadot_subsystem::{ + jaeger, Subsystem, SubsystemResult, SubsystemContext, SpawnedSubsystem, ActiveLeavesUpdate, FromOverseer, OverseerSignal, messages::{ @@ -828,6 +829,7 @@ async fn handle_network_update( ).await; if let Some((relay_parent, new)) = new_stored { + let mut _span = jaeger::hash_span(&relay_parent, "sending-statement"); // When we receive a new message from a peer, we forward it to the // candidate backing subsystem. let message = AllMessages::CandidateBacking( @@ -943,6 +945,7 @@ impl StatementDistribution { FromOverseer::Communication { msg } => match msg { StatementDistributionMessage::Share(relay_parent, statement) => { let _timer = metrics.time_share(); + let mut _span = jaeger::hash_span(&relay_parent, "circulate-statement"); inform_statement_listeners( &statement, diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 0bb1769d9a..c62d4d526f 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -35,12 +35,16 @@ use { polkadot_primitives::v1::ParachainHost, sc_authority_discovery::Service as AuthorityDiscoveryService, sp_blockchain::HeaderBackend, - sp_core::traits::SpawnNamed, sp_keystore::SyncCryptoStorePtr, sp_trie::PrefixedMemoryDB, sc_client_api::ExecutorProvider, }; +use sp_core::traits::SpawnNamed; + + +use polkadot_subsystem::jaeger; + use std::sync::Arc; use prometheus_endpoint::Registry; @@ -97,7 +101,6 @@ native_executor_instance!( frame_benchmarking::benchmarking::HostFunctions, ); - #[derive(thiserror::Error, Debug)] pub enum Error { #[error(transparent)] @@ -121,6 +124,9 @@ pub enum Error { #[error(transparent)] Prometheus(#[from] prometheus_endpoint::PrometheusError), + #[error(transparent)] + Jaeger(#[from] polkadot_subsystem::jaeger::JaegerError), + #[cfg(feature = "full-node")] #[error(transparent)] Availability(#[from] AvailabilityError), @@ -162,6 +168,20 @@ fn set_prometheus_registry(config: &mut Configuration) -> Result<(), Error> { Ok(()) } +/// Initialize the `Jeager` collector. The destination must listen +/// on the given address and port for `UDP` packets. +fn jaeger_launch_collector_with_agent(spawner: impl SpawnNamed, config: &Configuration, agent: Option) -> Result<(), Error> { + if let Some(agent) = agent { + let cfg = jaeger::JaegerConfig::builder() + .agent(agent) + .named(&config.network.node_name) + .build(); + + jaeger::Jaeger::new(cfg).launch(spawner)?; + } + Ok(()) +} + pub type FullBackend = service::TFullBackend; #[cfg(feature = "full-node")] type FullSelectChain = sc_consensus::LongestChain; @@ -177,7 +197,7 @@ type LightClient = service::TLightClientWithBackend; #[cfg(feature = "full-node")] -fn new_partial(config: &mut Configuration) -> Result< +fn new_partial(config: &mut Configuration, jaeger_agent: Option) -> Result< service::PartialComponents< FullClient, FullBackend, FullSelectChain, consensus_common::DefaultImportQueue>, @@ -207,12 +227,15 @@ fn new_partial(config: &mut Configuration) -> Result< { set_prometheus_registry(config)?; + let inherent_data_providers = inherents::InherentDataProviders::new(); let (client, backend, keystore_container, task_manager) = service::new_full_parts::(&config)?; let client = Arc::new(client); + jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?; + let select_chain = sc_consensus::LongestChain::new(backend.clone()); let transaction_pool = sc_transaction_pool::BasicPool::new_full( @@ -507,6 +530,7 @@ pub fn new_full( mut config: Configuration, is_collator: IsCollator, grandpa_pause: Option<(u32, u32)>, + jaeger_agent: Option, isolation_strategy: IsolationStrategy, ) -> Result>>, Error> where @@ -532,7 +556,7 @@ pub fn new_full( transaction_pool, inherent_data_providers, other: (rpc_extensions_builder, import_setup, rpc_setup) - } = new_partial::(&mut config)?; + } = new_partial::(&mut config, jaeger_agent)?; let prometheus_registry = config.prometheus_registry().cloned(); @@ -676,7 +700,7 @@ pub fn new_full( task_manager.spawn_handle(), client.clone(), transaction_pool, - overseer_handler.as_ref().ok_or_else(|| Error::AuthoritiesRequireRealOverseer)?.clone(), + overseer_handler.as_ref().ok_or(Error::AuthoritiesRequireRealOverseer)?.clone(), prometheus_registry.as_ref(), ); @@ -879,7 +903,7 @@ fn new_light(mut config: Configuration) -> Result<(TaskManage /// Builds a new object suitable for chain operations. #[cfg(feature = "full-node")] -pub fn new_chain_ops(mut config: &mut Configuration) -> Result< +pub fn new_chain_ops(mut config: &mut Configuration, jaeger_agent: Option) -> Result< ( Arc, Arc, @@ -892,19 +916,19 @@ pub fn new_chain_ops(mut config: &mut Configuration) -> Result< config.keystore = service::config::KeystoreConfig::InMemory; if config.chain_spec.is_rococo() { let service::PartialComponents { client, backend, import_queue, task_manager, .. } - = new_partial::(config)?; + = new_partial::(config, jaeger_agent)?; Ok((Arc::new(Client::Rococo(client)), backend, import_queue, task_manager)) } else if config.chain_spec.is_kusama() { let service::PartialComponents { client, backend, import_queue, task_manager, .. } - = new_partial::(config)?; + = new_partial::(config, jaeger_agent)?; Ok((Arc::new(Client::Kusama(client)), backend, import_queue, task_manager)) } else if config.chain_spec.is_westend() { let service::PartialComponents { client, backend, import_queue, task_manager, .. } - = new_partial::(config)?; + = new_partial::(config, jaeger_agent)?; Ok((Arc::new(Client::Westend(client)), backend, import_queue, task_manager)) } else { let service::PartialComponents { client, backend, import_queue, task_manager, .. } - = new_partial::(config)?; + = new_partial::(config, jaeger_agent)?; Ok((Arc::new(Client::Polkadot(client)), backend, import_queue, task_manager)) } } @@ -927,12 +951,14 @@ pub fn build_full( config: Configuration, is_collator: IsCollator, grandpa_pause: Option<(u32, u32)>, + jaeger_agent: Option, ) -> Result, Error> { if config.chain_spec.is_rococo() { new_full::( config, is_collator, grandpa_pause, + jaeger_agent, Default::default(), ).map(|full| full.with_client(Client::Rococo)) } else if config.chain_spec.is_kusama() { @@ -940,6 +966,7 @@ pub fn build_full( config, is_collator, grandpa_pause, + jaeger_agent, Default::default(), ).map(|full| full.with_client(Client::Kusama)) } else if config.chain_spec.is_westend() { @@ -947,6 +974,7 @@ pub fn build_full( config, is_collator, grandpa_pause, + jaeger_agent, Default::default(), ).map(|full| full.with_client(Client::Westend)) } else { @@ -954,6 +982,7 @@ pub fn build_full( config, is_collator, grandpa_pause, + jaeger_agent, Default::default(), ).map(|full| full.with_client(Client::Polkadot)) } diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml index e70e2171ec..73d688e775 100644 --- a/polkadot/node/subsystem/Cargo.toml +++ b/polkadot/node/subsystem/Cargo.toml @@ -6,14 +6,17 @@ edition = "2018" description = "Subsystem traits and message definitions" [dependencies] +async-std = "1.8.0" async-trait = "0.1.42" derive_more = "0.99.11" futures = "0.3.8" futures-timer = "3.0.2" +mick-jaeger = "0.1.1" +lazy_static = "1.4" tracing = "0.1.22" tracing-futures = "0.2.4" parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] } -parking_lot = { version = "0.11.1", optional = true } +parking_lot = "0.11.1" pin-project = "1.0.2" polkadot-node-primitives = { path = "../primitives" } polkadot-node-network-protocol = { path = "../network/protocol" } @@ -24,10 +27,10 @@ smallvec = "1.5.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } thiserror = "1.0.22" +log = "0.4.11" [dev-dependencies] assert_matches = "1.4.0" async-trait = "0.1.42" futures = { version = "0.3.8", features = ["thread-pool"] } -parking_lot = "0.11.1" polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } diff --git a/polkadot/node/subsystem/src/errors.rs b/polkadot/node/subsystem/src/errors.rs index 5af573c87f..69ddd86108 100644 --- a/polkadot/node/subsystem/src/errors.rs +++ b/polkadot/node/subsystem/src/errors.rs @@ -59,3 +59,21 @@ impl core::fmt::Display for ChainApiError { } impl std::error::Error for ChainApiError {} + + +/// A description of an error causing the chain API request to be unservable. +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum JaegerError { + #[error("Already launched the collector thread")] + AlreadyLaunched, + + #[error("Missing jaeger configuration")] + MissingConfiguration, + + #[error("Failed to allocate port for UDP transfer to jaeger agent")] + PortAllocationError(#[source] std::io::Error), + + #[error("Failed to send jaeger span to agent")] + SendError(#[source] std::io::Error), +} diff --git a/polkadot/node/subsystem/src/jaeger.rs b/polkadot/node/subsystem/src/jaeger.rs new file mode 100644 index 0000000000..56ba883354 --- /dev/null +++ b/polkadot/node/subsystem/src/jaeger.rs @@ -0,0 +1,251 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Jaeger integration. +//! +//! See for an introduction. +//! +//! The easiest way to try Jaeger is: +//! +//! - Start a docker container with the all-in-one docker image (see below). +//! - Open your browser and navigate to to acces the UI. +//! +//! The all-in-one image can be started with: +//! +//! ```not_rust +//! podman login docker.io +//! podman run -d --name jaeger \ +//! -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ +//! -p 5775:5775/udp \ +//! -p 6831:6831/udp \ +//! -p 6832:6832/udp \ +//! -p 5778:5778 \ +//! -p 16686:16686 \ +//! -p 14268:14268 \ +//! -p 14250:14250 \ +//! -p 9411:9411 \ +//! docker.io/jaegertracing/all-in-one:1.21 +//! ``` +//! + +use polkadot_node_primitives::SpawnNamed; +use polkadot_primitives::v1::{Hash, PoV, CandidateHash}; +use parking_lot::RwLock; +use std::sync::Arc; +use std::result; +pub use crate::errors::JaegerError; + + +lazy_static::lazy_static! { + static ref INSTANCE: RwLock = RwLock::new(Jaeger::None); +} + +/// Configuration for the jaeger tracing. +#[derive(Clone)] +pub struct JaegerConfig { + node_name: String, + agent_addr: std::net::SocketAddr, +} + +impl std::default::Default for JaegerConfig { + fn default() -> Self { + Self { + node_name: "unknown_".to_owned(), + agent_addr: "127.0.0.1:6831".parse().expect(r#"Static "127.0.0.1:6831" is a valid socket address string. qed"#), + } + } +} + +impl JaegerConfig { + /// Use the builder pattern to construct a configuration. + pub fn builder() -> JaegerConfigBuilder { + JaegerConfigBuilder::default() + } +} + + +/// Jaeger configuration builder. +#[derive(Default)] +pub struct JaegerConfigBuilder { + inner: JaegerConfig +} + +impl JaegerConfigBuilder { + /// Set the name for this node. + pub fn named(mut self, name: S) -> Self where S: AsRef { + self.inner.node_name = name.as_ref().to_owned(); + self + } + + /// Set the agent address to send the collected spans to. + pub fn agent(mut self, addr: U) -> Self where U: Into { + self.inner.agent_addr = addr.into(); + self + } + + /// Construct the configuration. + pub fn build(self) -> JaegerConfig { + self.inner + } +} + +/// A wrapper type for a span. +/// +/// Handles running with and without jaeger. +pub enum JaegerSpan { + /// Running with jaeger being enabled. + Enabled(mick_jaeger::Span), + /// Running with jaeger disabled. + Disabled, +} + +impl JaegerSpan { + /// Derive a child span from `self`. + pub fn child(&self, name: impl Into) -> Self { + match self { + Self::Enabled(inner) => Self::Enabled(inner.child(name)), + Self::Disabled => Self::Disabled, + } + } + /// Add an additional tag to the span. + pub fn add_string_tag(&mut self, tag: &str, value: &str) { + match self { + Self::Enabled(ref mut inner) => inner.add_string_tag(tag, value), + Self::Disabled => {}, + } + } +} + +impl From> for JaegerSpan { + fn from(src: Option) -> Self { + if let Some(span) = src { + Self::Enabled(span) + } else { + Self::Disabled + } + } +} + +impl From for JaegerSpan { + fn from(src: mick_jaeger::Span) -> Self { + Self::Enabled(src) + } +} + +/// Shortcut for [`candidate_hash_span`] with the hash of the `Candidate` block. +#[inline(always)] +pub fn candidate_hash_span(candidate_hash: &CandidateHash, span_name: impl Into) -> JaegerSpan { + INSTANCE.read_recursive().span(|| { candidate_hash.0 }, span_name).into() +} + +/// Shortcut for [`hash_span`] with the hash of the `PoV`. +#[inline(always)] +pub fn pov_span(pov: &PoV, span_name: impl Into) -> JaegerSpan { + INSTANCE.read_recursive().span(|| { pov.hash() }, span_name).into() +} + +/// Creates a `Span` referring to the given hash. All spans created with [`hash_span`] with the +/// same hash (even from multiple different nodes) will be visible in the same view on Jaeger. +#[inline(always)] +pub fn hash_span(hash: &Hash, span_name: impl Into) -> JaegerSpan { + INSTANCE.read_recursive().span(|| { *hash }, span_name).into() +} + +/// Stateful convenience wrapper around [`mick_jaeger`]. +pub enum Jaeger { + /// Launched and operational state. + Launched { + /// [`mick_jaeger`] provided API to record spans to. + traces_in: Arc, + }, + /// Preparation state with the necessary config to launch the collector. + Prep(JaegerConfig), + /// Uninitialized, suggests wrong API usage if encountered. + None, +} + +impl Jaeger { + /// Spawn the jaeger instance. + pub fn new(cfg: JaegerConfig) -> Self { + Jaeger::Prep(cfg) + } + + /// Spawn the background task in order to send the tracing information out via udp + #[cfg(target_os = "unknown")] + pub fn launch(self, _spawner: S) -> result::Result<(), JaegerError> { + Ok(()) + } + + /// Spawn the background task in order to send the tracing information out via udp + #[cfg(not(target_os = "unknown"))] + pub fn launch(self, spawner: S) -> result::Result<(), JaegerError> { + let cfg = match self { + Self::Prep(cfg) => Ok(cfg), + Self::Launched{ .. } => { + return Err(JaegerError::AlreadyLaunched) + } + Self::None => Err(JaegerError::MissingConfiguration), + }?; + + let jaeger_agent = cfg.agent_addr; + + log::info!("🐹 Collecting jaeger spans for {:?}", &jaeger_agent); + + let (traces_in, mut traces_out) = mick_jaeger::init(mick_jaeger::Config { + service_name: format!("{}-{}", cfg.node_name, cfg.node_name), + }); + + // Spawn a background task that pulls span information and sends them on the network. + spawner.spawn("jaeger-collector", Box::pin(async move { + let res = async_std::net::UdpSocket::bind("127.0.0.1:0").await + .map_err(JaegerError::PortAllocationError); + match res { + Ok(udp_socket) => loop { + let buf = traces_out.next().await; + // UDP sending errors happen only either if the API is misused or in case of missing privilege. + if let Err(e) = udp_socket.send_to(&buf, jaeger_agent).await + .map_err(|e| JaegerError::SendError(e)) + { + log::trace!("Jaeger: {:?}", e); + } + } + Err(e) => { + log::warn!("Jaeger: {:?}", e); + } + } + })); + + *INSTANCE.write() = Self::Launched { + traces_in, + }; + Ok(()) + } + + fn span(&self, lazy_hash: F, span_name: impl Into) -> Option + where + F: Fn() -> Hash, + { + if let Self::Launched { traces_in , .. } = self { + let hash = lazy_hash(); + let mut buf = [0u8; 16]; + buf.copy_from_slice(&hash.as_ref()[0..16]); + let trace_id = std::num::NonZeroU128::new(u128::from_be_bytes(buf))?; + Some(traces_in.span(trace_id, span_name)) + } else { + None + } + } +} diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 1d9d7b7352..f726e2a157 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -31,12 +31,13 @@ use futures::future::BoxFuture; use polkadot_primitives::v1::Hash; use async_trait::async_trait; use smallvec::SmallVec; -use thiserror::Error; use crate::messages::AllMessages; pub mod errors; pub mod messages; +pub mod jaeger; +pub use crate::jaeger::*; /// How many slots are stack-reserved for active leaves updates /// @@ -119,29 +120,27 @@ pub enum FromOverseer { /// * Subsystems dying when they are not expected to /// * Subsystems not dying when they are told to die /// * etc. -#[derive(Error, Debug)] +#[derive(thiserror::Error, Debug)] +#[allow(missing_docs)] pub enum SubsystemError { - /// A notification connection is no longer valid. #[error(transparent)] NotifyCancellation(#[from] oneshot::Canceled), - /// Queue does not accept another item. #[error(transparent)] QueueError(#[from] mpsc::SendError), - /// An attempt to spawn a futures task did not succeed. #[error(transparent)] TaskSpawn(#[from] futures::task::SpawnError), - /// An infallable error. #[error(transparent)] Infallible(#[from] std::convert::Infallible), - /// Prometheus had a problem #[error(transparent)] Prometheus(#[from] substrate_prometheus_endpoint::PrometheusError), - /// An other error lacking particular type information. + #[error(transparent)] + Jaeger(#[from] errors::JaegerError), + #[error("Failed to {0}")] Context(String), diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index 7a37e3db31..76d2ab735a 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -81,6 +81,7 @@ pub fn new_full( config, is_collator, None, + None, polkadot_parachain::wasm_executor::IsolationStrategy::InProcess, ) } diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index 49ffc379eb..09998e9bc9 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -61,6 +61,7 @@ fn main() -> Result<()> { config, polkadot_service::IsCollator::Yes(collator.collator_id()), None, + None, ).map_err(|e| e.to_string())?; let mut overseer_handler = full_node .overseer_handler