create more realistic soak test so that we can see a more representative UI in action

This commit is contained in:
James Wilson
2021-08-03 15:40:20 +01:00
parent a85e13e0ec
commit 5d81128e74
6 changed files with 363 additions and 7 deletions
+16 -3
View File
@@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "0.7.18"
@@ -140,7 +142,7 @@ dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"time 0.1.43",
"winapi",
]
@@ -608,9 +610,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.97"
version = "0.2.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12b8adadd720df158f4d70dfe7ccc6adb0472d7c55ca83445f6a5ab3e36f8fb6"
checksum = "320cfe77175da3a483efed4bc0adc1968ca050b098ce4f2f1c13a56626128790"
[[package]]
name = "lock_api"
@@ -1336,6 +1338,7 @@ dependencies = [
"serde_json",
"soketto",
"thiserror",
"time 0.3.0",
"tokio",
"tokio-util",
]
@@ -1379,6 +1382,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "time"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6cf2535c6456e772ad756a0854ec907ede55d73d0b5a34855d808cb2d2f0942e"
dependencies = [
"itoa",
"libc",
]
[[package]]
name = "tinyvec"
version = "1.2.0"
+141 -4
View File
@@ -36,7 +36,7 @@ box; MacOS seems to hit limits quicker in general.
use common::node_types::BlockHash;
use common::ws_client::SentMessage;
use futures::StreamExt;
use futures::{StreamExt, future};
use serde_json::json;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -69,9 +69,12 @@ pub async fn soak_test() {
run_soak_test(opts).await;
}
/// The general soak test runner. This is called by tests.
/// A general soak test runner.
/// This test sends the same message over and over, and so
/// the results should be pretty reproducible.
async fn run_soak_test(opts: SoakTestOpts) {
let mut server = start_server_release().await;
println!("Telemetry core running at {}", server.get_core().host());
// Start up the shards we requested:
let mut shard_ids = vec![];
@@ -161,13 +164,16 @@ async fn run_soak_test(opts: SoakTestOpts) {
// Also start receiving messages, counting the bytes received so far.
let bytes_out = Arc::new(AtomicUsize::new(0));
let msgs_out = Arc::new(AtomicUsize::new(0));
for (_, mut feed_rx) in feeds {
let bytes_out = Arc::clone(&bytes_out);
let msgs_out = Arc::clone(&msgs_out);
tokio::task::spawn(async move {
while let Some(msg) = feed_rx.next().await {
let msg = msg.expect("message could be received");
let num_bytes = msg.len();
bytes_out.fetch_add(num_bytes, Ordering::Relaxed);
msgs_out.fetch_add(1, Ordering::Relaxed);
}
});
}
@@ -177,24 +183,29 @@ async fn run_soak_test(opts: SoakTestOpts) {
let one_mb = 1024.0 * 1024.0;
let mut last_bytes_in = 0;
let mut last_bytes_out = 0;
let mut last_msgs_out = 0;
let mut n = 1;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let bytes_in_val = bytes_in.load(Ordering::Relaxed);
let bytes_out_val = bytes_out.load(Ordering::Relaxed);
let msgs_out_val = msgs_out.load(Ordering::Relaxed);
println!(
"#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {})",
"#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {}, msgs out: {}, total msgs out: {})",
n,
(bytes_in_val - last_bytes_in) as f64 / one_mb,
(bytes_out_val - last_bytes_out) as f64 / one_mb,
bytes_in_val,
bytes_out_val
bytes_out_val,
(msgs_out_val - last_msgs_out),
msgs_out_val
);
n += 1;
last_bytes_in = bytes_in_val;
last_bytes_out = bytes_out_val;
last_msgs_out = msgs_out_val;
}
});
@@ -202,6 +213,132 @@ async fn run_soak_test(opts: SoakTestOpts) {
send_handle.await.unwrap();
}
/// Identical to `soak_test`, except that we try to send realistic messages from fake nodes.
/// This means it's potentially less reproducable, but presents a more accurate picture of
/// the load.
#[ignore]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
pub async fn realistic_soak_test() {
let opts = get_soak_test_opts();
run_realistic_soak_test(opts).await;
}
/// A general soak test runner.
/// This test sends realistic messages from connected nodes
/// so that we can see how things react under more normal
/// circumstances
async fn run_realistic_soak_test(opts: SoakTestOpts) {
let mut server = start_server_release().await;
println!("Telemetry core running at {}", server.get_core().host());
// Start up the shards we requested:
let mut shard_ids = vec![];
for _ in 0..opts.shards {
let shard_id = server.add_shard().await.expect("shard can't be added");
shard_ids.push(shard_id);
}
// Connect nodes to each shard:
let mut nodes = vec![];
for &shard_id in &shard_ids {
let mut conns = server
.get_shard(shard_id)
.unwrap()
.connect_multiple_nodes(opts.nodes)
.await
.expect("node connections failed");
nodes.append(&mut conns);
}
// Start nodes talking to the shards:
let bytes_in = Arc::new(AtomicUsize::new(0));
for node in nodes.into_iter().enumerate() {
let bytes_in = Arc::clone(&bytes_in);
tokio::spawn(async move {
let (idx, (tx, _)) = node;
let telemetry = test_utils::fake_telemetry::FakeTelemetry::new(
Duration::from_secs(3),
format!("Node {}", idx + 1),
"Polkadot".to_owned(),
idx + 1
);
let res = telemetry.start(|msg| async {
bytes_in.fetch_add(msg.len(), Ordering::Relaxed);
tx.unbounded_send(SentMessage::Binary(msg))?;
Ok::<_, anyhow::Error>(())
}).await;
if let Err(e) = res {
log::error!("Telemetry Node #{} has died with error: {}", idx, e);
}
});
}
// Connect feeds to the core:
let mut feeds = server
.get_core()
.connect_multiple_feeds(opts.feeds)
.await
.expect("feed connections failed");
// Every feed subscribes to the chain above to recv messages about it:
for (feed_tx, _) in &mut feeds {
feed_tx.send_command("subscribe", "Polkadot").unwrap();
}
// Also start receiving messages, counting the bytes received so far.
let bytes_out = Arc::new(AtomicUsize::new(0));
let msgs_out = Arc::new(AtomicUsize::new(0));
for (_, mut feed_rx) in feeds {
let bytes_out = Arc::clone(&bytes_out);
let msgs_out = Arc::clone(&msgs_out);
tokio::task::spawn(async move {
while let Some(msg) = feed_rx.next().await {
let msg = msg.expect("message could be received");
let num_bytes = msg.len();
bytes_out.fetch_add(num_bytes, Ordering::Relaxed);
msgs_out.fetch_add(1, Ordering::Relaxed);
}
});
}
// Periodically report on bytes out
tokio::task::spawn(async move {
let one_mb = 1024.0 * 1024.0;
let mut last_bytes_in = 0;
let mut last_bytes_out = 0;
let mut last_msgs_out = 0;
let mut n = 1;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let bytes_in_val = bytes_in.load(Ordering::Relaxed);
let bytes_out_val = bytes_out.load(Ordering::Relaxed);
let msgs_out_val = msgs_out.load(Ordering::Relaxed);
println!(
"#{}: MB in/out per measurement: {:.4} / {:.4}, total bytes in/out: {} / {}, msgs out: {}, total msgs out: {})",
n,
(bytes_in_val - last_bytes_in) as f64 / one_mb,
(bytes_out_val - last_bytes_out) as f64 / one_mb,
bytes_in_val,
bytes_out_val,
(msgs_out_val - last_msgs_out),
msgs_out_val
);
n += 1;
last_bytes_in = bytes_in_val;
last_bytes_out = bytes_out_val;
last_msgs_out = msgs_out_val;
}
});
// Wait forever.
future::pending().await
}
/// General arguments that are used to start a soak test. Run `soak_test` as
/// instructed by its documentation for full control over what is ran, or run
/// preconfigured variants.
+1
View File
@@ -17,3 +17,4 @@ thiserror = "1.0.25"
tokio = { version = "1.7.1", features = ["full"] }
tokio-util = { version = "0.6.7", features = ["full"] }
common = { path = "../common" }
time = { version = "0.3.0", features = ["formatting"] }
+197
View File
@@ -0,0 +1,197 @@
use std::time::Duration;
use std::future::Future;
use serde_json::json;
use common::node_types::BlockHash;
use tokio::time::{self, MissedTickBehavior};
use ::time::{ OffsetDateTime, format_description::well_known::Rfc3339 };
/// This emits fake but realistic looking telemetry messages.
/// Can be connected to a telemetry server to emit realistic messages.
pub struct FakeTelemetry {
block_time: Duration,
node_name: String,
chain: String,
message_id: usize
}
impl FakeTelemetry {
pub fn new(block_time: Duration, node_name: String, chain: String, message_id: usize) -> Self {
Self {
block_time,
node_name,
chain,
message_id
}
}
/// Begin emitting messages from this node, calling the provided callback each
/// time a new message is emitted.
// Unused assignments allowed because macro seems to mess with knowledge of what's
// been read.
#[allow(unused_assignments)]
pub async fn start<Func, Fut, E>(self, mut on_message: Func) -> Result<(), E>
where
Func: Send + FnMut(Vec<u8>) -> Fut,
Fut: Future<Output = Result<(), E>>,
E: Into<anyhow::Error>
{
let id = self.message_id;
let name = self.node_name;
let chain = self.chain;
let block_time = self.block_time;
// Our "state". These numbers can be hashed to give a block hash,
// and also represent the height of the chain so far. Increment each
// as needed.
let mut best_block_n: u64 = 0;
let mut finalized_block_n: u64 = 0;
// A helper to send JSON messages without consuming on_message:
macro_rules! send_msg {
($($json:tt)+) => {{
let msg = json!($($json)+);
let bytes = serde_json::to_vec(&msg).unwrap();
on_message(bytes).await
}}
}
// Send system connected immediately
send_msg!({
"id":id,
"payload": {
"authority":true,
"chain":chain,
"config":"",
"genesis_hash":block_hash(best_block_n),
"implementation":"Substrate Node",
"msg":"system.connected",
"name":name,
"network_id":"12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp",
"startup_time":"1627986634759",
"version":"2.0.0-07a1af348-aarch64-macos"
},
"ts":now_iso()
})?;
best_block_n += 1;
// First block import immediately (height 1)
send_msg!({
"id":id,
"payload":{
"best":block_hash(best_block_n),
"height":best_block_n,
"msg":"block.import",
"origin":"Own"
},
"ts":now_iso()
})?;
best_block_n += 1;
let now = tokio::time::Instant::now();
// Configure our message intervals:
let mut new_block_every = time::interval_at(now + block_time, block_time);
new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst);
let mut system_interval_every = time::interval_at(now + Duration::from_secs(2), block_time * 2);
new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst);
let mut finalised_every = time::interval_at(now + Duration::from_secs(1) + block_time * 3, block_time);
new_block_every.set_missed_tick_behavior(MissedTickBehavior::Burst);
// Send messages every interval:
loop {
tokio::select! {
// Add a new block:
_ = new_block_every.tick() => {
send_msg!({
"id":id,
"payload":{
"hash":"0x918bf5125307b4ac1b2c67aa43ed38517617720ac96cbd5664d7a0f0aa32e1b1", // Don't think this matters
"msg":"prepared_block_for_proposing",
"number":best_block_n.to_string() // seems to be a string, not a number in the "real" JSON
},
"ts":now_iso()
})?;
send_msg!({
"id":id,
"payload":{
"best":block_hash(best_block_n),
"height":best_block_n,
"msg":"block.import",
"origin":"Own"
},
"ts":now_iso()
})?;
best_block_n += 1;
},
// Periodic updates on system state:
_ = system_interval_every.tick() => {
send_msg!({
"id":id,
"payload":{
"best":block_hash(best_block_n),
"finalized_hash":block_hash(finalized_block_n),
"finalized_height":finalized_block_n,
"height":best_block_n,
"msg":"system.interval",
"txcount":0,
"used_state_cache_size":870775
},
"ts":now_iso()
})?;
send_msg!({
"id":id,
"payload":{
"bandwidth_download":0,
"bandwidth_upload":0,
"msg":"system.interval",
"peers":0
},
"ts":now_iso()
})?;
},
// Finalise a block:
_ = finalised_every.tick() => {
send_msg!({
"id":1,
"payload":{
"hash":block_hash(finalized_block_n),
"msg":"afg.finalized_blocks_up_to",
"number":finalized_block_n.to_string(), // string in "real" JSON.
},
"ts":now_iso()
})?;
send_msg!({
"id":1,
"payload":{
"best":block_hash(finalized_block_n),
"height":finalized_block_n.to_string(), // string in "real" JSON.
"msg":"notify.finalized"
},
"ts":now_iso()
})?;
finalized_block_n += 1;
},
};
}
}
}
fn now_iso() -> String {
OffsetDateTime::now_utc().format(&Rfc3339).unwrap()
}
/// Spread the u64 across the resulting u256 hash so that it's
/// more visible in the UI.
fn block_hash(n: u64) -> BlockHash {
let a: [u8; 32] = unsafe { std::mem::transmute([n,n,n,n]) };
BlockHash::from(a)
}
+3
View File
@@ -28,3 +28,6 @@ pub mod contains_matches;
/// Utilities to help with running tests from within this current workspace.
pub mod workspace;
/// A utility to generate fake telemetry messages at realistic intervals.
pub mod fake_telemetry;
+5
View File
@@ -352,6 +352,11 @@ impl<Channel> Process<Channel> {
self.id
}
/// Get the host that this process is running on
pub fn host(&self) -> &str {
&self.host
}
/// Kill the process and wait for this to complete
/// Not public: Klling done via Server.
async fn kill(self) -> Result<(), Error> {