diff --git a/backend/Cargo.lock b/backend/Cargo.lock index 6fbf5b0..2dbddc6 100644 --- a/backend/Cargo.lock +++ b/backend/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "aho-corasick" version = "0.7.18" @@ -140,7 +142,7 @@ dependencies = [ "libc", "num-integer", "num-traits", - "time", + "time 0.1.43", "winapi", ] @@ -608,9 +610,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.97" +version = "0.2.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6" +checksum = "320cfe77175da3a483efed4bc0adc1968ca050b098ce4f2f1c13a56626128790" [[package]] name = "lock_api" @@ -1336,6 +1338,7 @@ dependencies = [ "serde_json", "soketto", "thiserror", + "time 0.3.0", "tokio", "tokio-util", ] @@ -1379,6 +1382,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "time" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cf2535c6456e772ad756a0854ec907ede55d73d0b5a34855d808cb2d2f0942e" +dependencies = [ + "itoa", + "libc", +] + [[package]] name = "tinyvec" version = "1.2.0" diff --git a/backend/telemetry_core/tests/soak_tests.rs b/backend/telemetry_core/tests/soak_tests.rs index 81defbe..20c1c87 100644 --- a/backend/telemetry_core/tests/soak_tests.rs +++ b/backend/telemetry_core/tests/soak_tests.rs @@ -36,7 +36,7 @@ box; MacOS seems to hit limits quicker in general. use common::node_types::BlockHash; use common::ws_client::SentMessage; -use futures::StreamExt; +use futures::{StreamExt, future}; use serde_json::json; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -69,9 +69,12 @@ pub async fn soak_test() { run_soak_test(opts).await; } -/// The general soak test runner. This is called by tests. +/// A general soak test runner. +/// This test sends the same message over and over, and so +/// the results should be pretty reproducible. async fn run_soak_test(opts: SoakTestOpts) { let mut server = start_server_release().await; + println!("Telemetry core running at {}", server.get_core().host()); // Start up the shards we requested: let mut shard_ids = vec![]; @@ -161,13 +164,16 @@ async fn run_soak_test(opts: SoakTestOpts) { // Also start receiving messages, counting the bytes received so far. let bytes_out = Arc::new(AtomicUsize::new(0)); + let msgs_out = Arc::new(AtomicUsize::new(0)); for (_, mut feed_rx) in feeds { let bytes_out = Arc::clone(&bytes_out); + let msgs_out = Arc::clone(&msgs_out); tokio::task::spawn(async move { while let Some(msg) = feed_rx.next().await { let msg = msg.expect("message could be received"); let num_bytes = msg.len(); bytes_out.fetch_add(num_bytes, Ordering::Relaxed); + msgs_out.fetch_add(1, Ordering::Relaxed); } }); } @@ -177,24 +183,29 @@ async fn run_soak_test(opts: SoakTestOpts) { let one_mb = 1024.0 * 1024.0; let mut last_bytes_in = 0; let mut last_bytes_out = 0; + let mut last_msgs_out = 0; let mut n = 1; loop { tokio::time::sleep(Duration::from_secs(1)).await; let bytes_in_val = bytes_in.load(Ordering::Relaxed); let bytes_out_val = bytes_out.load(Ordering::Relaxed); + let msgs_out_val = msgs_out.load(Ordering::Relaxed); println!( - "#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {})", + "#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {}, msgs out: {}, total msgs out: {})", n, (bytes_in_val - last_bytes_in) as f64 / one_mb, (bytes_out_val - last_bytes_out) as f64 / one_mb, bytes_in_val, - bytes_out_val + bytes_out_val, + (msgs_out_val - last_msgs_out), + msgs_out_val ); n += 1; last_bytes_in = bytes_in_val; last_bytes_out = bytes_out_val; + last_msgs_out = msgs_out_val; } }); @@ -202,6 +213,132 @@ async fn run_soak_test(opts: SoakTestOpts) { send_handle.await.unwrap(); } +/// Identical to `soak_test`, except that we try to send realistic messages from fake nodes. +/// This means it's potentially less reproducable, but presents a more accurate picture of +/// the load. +#[ignore] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +pub async fn realistic_soak_test() { + let opts = get_soak_test_opts(); + run_realistic_soak_test(opts).await; +} + +/// A general soak test runner. +/// This test sends realistic messages from connected nodes +/// so that we can see how things react under more normal +/// circumstances +async fn run_realistic_soak_test(opts: SoakTestOpts) { + let mut server = start_server_release().await; + println!("Telemetry core running at {}", server.get_core().host()); + + // Start up the shards we requested: + let mut shard_ids = vec![]; + for _ in 0..opts.shards { + let shard_id = server.add_shard().await.expect("shard can't be added"); + shard_ids.push(shard_id); + } + + // Connect nodes to each shard: + let mut nodes = vec![]; + for &shard_id in &shard_ids { + let mut conns = server + .get_shard(shard_id) + .unwrap() + .connect_multiple_nodes(opts.nodes) + .await + .expect("node connections failed"); + nodes.append(&mut conns); + } + + // Start nodes talking to the shards: + let bytes_in = Arc::new(AtomicUsize::new(0)); + for node in nodes.into_iter().enumerate() { + let bytes_in = Arc::clone(&bytes_in); + tokio::spawn(async move { + let (idx, (tx, _)) = node; + + let telemetry = test_utils::fake_telemetry::FakeTelemetry::new( + Duration::from_secs(3), + format!("Node {}", idx + 1), + "Polkadot".to_owned(), + idx + 1 + ); + + let res = telemetry.start(|msg| async { + bytes_in.fetch_add(msg.len(), Ordering::Relaxed); + tx.unbounded_send(SentMessage::Binary(msg))?; + Ok::<_, anyhow::Error>(()) + }).await; + + if let Err(e) = res { + log::error!("Telemetry Node #{} has died with error: {}", idx, e); + } + }); + } + + // Connect feeds to the core: + let mut feeds = server + .get_core() + .connect_multiple_feeds(opts.feeds) + .await + .expect("feed connections failed"); + + // Every feed subscribes to the chain above to recv messages about it: + for (feed_tx, _) in &mut feeds { + feed_tx.send_command("subscribe", "Polkadot").unwrap(); + } + + // Also start receiving messages, counting the bytes received so far. + let bytes_out = Arc::new(AtomicUsize::new(0)); + let msgs_out = Arc::new(AtomicUsize::new(0)); + for (_, mut feed_rx) in feeds { + let bytes_out = Arc::clone(&bytes_out); + let msgs_out = Arc::clone(&msgs_out); + tokio::task::spawn(async move { + while let Some(msg) = feed_rx.next().await { + let msg = msg.expect("message could be received"); + let num_bytes = msg.len(); + bytes_out.fetch_add(num_bytes, Ordering::Relaxed); + msgs_out.fetch_add(1, Ordering::Relaxed); + } + }); + } + + // Periodically report on bytes out + tokio::task::spawn(async move { + let one_mb = 1024.0 * 1024.0; + let mut last_bytes_in = 0; + let mut last_bytes_out = 0; + let mut last_msgs_out = 0; + let mut n = 1; + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let bytes_in_val = bytes_in.load(Ordering::Relaxed); + let bytes_out_val = bytes_out.load(Ordering::Relaxed); + let msgs_out_val = msgs_out.load(Ordering::Relaxed); + + println!( + "#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {}, msgs out: {}, total msgs out: {})", + n, + (bytes_in_val - last_bytes_in) as f64 / one_mb, + (bytes_out_val - last_bytes_out) as f64 / one_mb, + bytes_in_val, + bytes_out_val, + (msgs_out_val - last_msgs_out), + msgs_out_val + ); + + n += 1; + last_bytes_in = bytes_in_val; + last_bytes_out = bytes_out_val; + last_msgs_out = msgs_out_val; + } + }); + + // Wait forever. + future::pending().await +} + /// General arguments that are used to start a soak test. Run `soak_test` as /// instructed by its documentation for full control over what is ran, or run /// preconfigured variants. diff --git a/backend/test_utils/Cargo.toml b/backend/test_utils/Cargo.toml index 590d227..e3800c7 100644 --- a/backend/test_utils/Cargo.toml +++ b/backend/test_utils/Cargo.toml @@ -17,3 +17,4 @@ thiserror = "1.0.25" tokio = { version = "1.7.1", features = ["full"] } tokio-util = { version = "0.6.7", features = ["full"] } common = { path = "../common" } +time = { version = "0.3.0", features = ["formatting"] } diff --git a/backend/test_utils/src/fake_telemetry.rs b/backend/test_utils/src/fake_telemetry.rs new file mode 100644 index 0000000..c91d51a --- /dev/null +++ b/backend/test_utils/src/fake_telemetry.rs @@ -0,0 +1,197 @@ +use std::time::Duration; +use std::future::Future; +use serde_json::json; +use common::node_types::BlockHash; +use tokio::time::{self, MissedTickBehavior}; +use ::time::{ OffsetDateTime, format_description::well_known::Rfc3339 }; + +/// This emits fake but realistic looking telemetry messages. +/// Can be connected to a telemetry server to emit realistic messages. +pub struct FakeTelemetry { + block_time: Duration, + node_name: String, + chain: String, + message_id: usize +} + +impl FakeTelemetry { + pub fn new(block_time: Duration, node_name: String, chain: String, message_id: usize) -> Self { + Self { + block_time, + node_name, + chain, + message_id + } + } + + /// Begin emitting messages from this node, calling the provided callback each + /// time a new message is emitted. + // Unused assignments allowed because macro seems to mess with knowledge of what's + // been read. + #[allow(unused_assignments)] + pub async fn start(self, mut on_message: Func) -> Result<(), E> + where + Func: Send + FnMut(Vec) -> Fut, + Fut: Future>, + E: Into + { + + let id = self.message_id; + let name = self.node_name; + let chain = self.chain; + let block_time = self.block_time; + + // Our "state". These numbers can be hashed to give a block hash, + // and also represent the height of the chain so far. Increment each + // as needed. + let mut best_block_n: u64 = 0; + let mut finalized_block_n: u64 = 0; + + // A helper to send JSON messages without consuming on_message: + macro_rules! send_msg { + ($($json:tt)+) => {{ + let msg = json!($($json)+); + let bytes = serde_json::to_vec(&msg).unwrap(); + on_message(bytes).await + }} + } + + // Send system connected immediately + send_msg!({ + "id":id, + "payload": { + "authority":true, + "chain":chain, + "config":"", + "genesis_hash":block_hash(best_block_n), + "implementation":"Substrate Node", + "msg":"system.connected", + "name":name, + "network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp", + "startup_time":"1627986634759", + "version":"2.0.0-07a1af348-aarch64-macos" + }, + "ts":now_iso() + })?; + best_block_n += 1; + + // First block import immediately (height 1) + send_msg!({ + "id":id, + "payload":{ + "best":block_hash(best_block_n), + "height":best_block_n, + "msg":"block.import", + "origin":"Own" + }, + "ts":now_iso() + })?; + best_block_n += 1; + + let now = tokio::time::Instant::now(); + + // Configure our message intervals: + let mut new_block_every = time::interval_at(now + block_time, block_time); + new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst); + + let mut system_interval_every = time::interval_at(now + Duration::from_secs(2), block_time * 2); + new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst); + + let mut finalised_every = time::interval_at(now + Duration::from_secs(1) + block_time * 3, block_time); + new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst); + + // Send messages every interval: + loop { + tokio::select! { + // Add a new block: + _ = new_block_every.tick() => { + + send_msg!({ + "id":id, + "payload":{ + "hash":"0x918bf5125307b4ac1b2c67aa43ed38517617720ac96cbd5664d7a0f0aa32e1b1", // Don't think this matters + "msg":"prepared_block_for_proposing", + "number":best_block_n.to_string() // seems to be a string, not a number in the "real" JSON + }, + "ts":now_iso() + })?; + send_msg!({ + "id":id, + "payload":{ + "best":block_hash(best_block_n), + "height":best_block_n, + "msg":"block.import", + "origin":"Own" + }, + "ts":now_iso() + })?; + best_block_n += 1; + + }, + // Periodic updates on system state: + _ = system_interval_every.tick() => { + + send_msg!({ + "id":id, + "payload":{ + "best":block_hash(best_block_n), + "finalized_hash":block_hash(finalized_block_n), + "finalized_height":finalized_block_n, + "height":best_block_n, + "msg":"system.interval", + "txcount":0, + "used_state_cache_size":870775 + }, + "ts":now_iso() + })?; + send_msg!({ + "id":id, + "payload":{ + "bandwidth_download":0, + "bandwidth_upload":0, + "msg":"system.interval", + "peers":0 + }, + "ts":now_iso() + })?; + + }, + // Finalise a block: + _ = finalised_every.tick() => { + + send_msg!({ + "id":1, + "payload":{ + "hash":block_hash(finalized_block_n), + "msg":"afg.finalized_blocks_up_to", + "number":finalized_block_n.to_string(), // string in "real" JSON. + }, + "ts":now_iso() + })?; + send_msg!({ + "id":1, + "payload":{ + "best":block_hash(finalized_block_n), + "height":finalized_block_n.to_string(), // string in "real" JSON. + "msg":"notify.finalized" + }, + "ts":now_iso() + })?; + finalized_block_n += 1; + + }, + }; + } + } +} + +fn now_iso() -> String { + OffsetDateTime::now_utc().format(&Rfc3339).unwrap() +} + +/// Spread the u64 across the resulting u256 hash so that it's +/// more visible in the UI. +fn block_hash(n: u64) -> BlockHash { + let a: [u8; 32] = unsafe { std::mem::transmute([n,n,n,n]) }; + BlockHash::from(a) +} \ No newline at end of file diff --git a/backend/test_utils/src/lib.rs b/backend/test_utils/src/lib.rs index 6d614a7..1e9d119 100644 --- a/backend/test_utils/src/lib.rs +++ b/backend/test_utils/src/lib.rs @@ -28,3 +28,6 @@ pub mod contains_matches; /// Utilities to help with running tests from within this current workspace. pub mod workspace; + +/// A utility to generate fake telemetry messages at realistic intervals. +pub mod fake_telemetry; \ No newline at end of file diff --git a/backend/test_utils/src/server/server.rs b/backend/test_utils/src/server/server.rs index f322bda..70d0cb7 100644 --- a/backend/test_utils/src/server/server.rs +++ b/backend/test_utils/src/server/server.rs @@ -352,6 +352,11 @@ impl Process { self.id } + /// Get the host that this process is running on + pub fn host(&self) -> &str { + &self.host + } + /// Kill the process and wait for this to complete /// Not public: Klling done via Server. async fn kill(self) -> Result<(), Error> {