cargo fmt

This commit is contained in:
James Wilson
2021-07-28 16:15:34 +01:00
parent 83d31ef0b3
commit 5f022069db
4 changed files with 58 additions and 47 deletions
+10 -13
View File
@@ -1,6 +1,6 @@
use anyhow::{ anyhow, Error };
use anyhow::{anyhow, Error};
#[derive(Copy,Clone,Debug)]
#[derive(Copy, Clone, Debug)]
pub struct ByteSize(usize);
impl ByteSize {
@@ -38,9 +38,14 @@ impl std::str::FromStr for ByteSize {
"KiB" | "Ki" => n * 1024,
"MiB" | "Mi" => n * 1024 * 1024,
"GiB" | "Gi" => n * 1024 * 1024 * 1024,
_ => return Err(anyhow!("\
_ => {
return Err(anyhow!(
"\
Cannot parse into bytes; suffix is '{}', but expecting one of \
B,b, kB,K,k, MB,M,m, GB,G,g, KiB,Ki, MiB,Mi, GiB,Gi", suffix))
B,b, kB,K,k, MB,M,m, GB,G,g, KiB,Ki, MiB,Mi, GiB,Gi",
suffix
))
}
};
Ok(ByteSize(n))
}
@@ -52,34 +57,27 @@ impl std::str::FromStr for ByteSize {
mod test {
use crate::byte_size::ByteSize;
#[test]
fn can_parse_valid_strings() {
let cases = vec![
("100", 100),
("100B", 100),
("100b", 100),
("20kB", 20 * 1000),
("20 kB", 20 * 1000),
("20K", 20 * 1000),
(" 20k", 20 * 1000),
("1MB", 1 * 1000 * 1000),
("1M", 1 * 1000 * 1000),
("1m", 1 * 1000 * 1000),
("1 m", 1 * 1000 * 1000),
("1GB", 1 * 1000 * 1000 * 1000),
("1G", 1 * 1000 * 1000 * 1000),
("1g", 1 * 1000 * 1000 * 1000),
("1KiB", 1 * 1024),
("1Ki", 1 * 1024),
("1MiB", 1 * 1024 * 1024),
("1Mi", 1 * 1024 * 1024),
("1GiB", 1 * 1024 * 1024 * 1024),
("1Gi", 1 * 1024 * 1024 * 1024),
(" 1 Gi ", 1 * 1024 * 1024 * 1024),
@@ -90,5 +88,4 @@ mod test {
assert_eq!(b.into_bytes(), expected);
}
}
}
}
+2 -2
View File
@@ -1,13 +1,13 @@
pub mod byte_size;
pub mod http_utils;
pub mod id_type;
pub mod internal_messages;
pub mod node_message;
pub mod node_types;
pub mod ready_chunks_all;
pub mod rolling_total;
pub mod time;
pub mod ws_client;
pub mod rolling_total;
pub mod byte_size;
mod assign_id;
mod dense_map;
+42 -28
View File
@@ -1,6 +1,6 @@
use std::time::{ Duration, Instant };
use num_traits::{ Zero, SaturatingAdd, SaturatingSub };
use num_traits::{SaturatingAdd, SaturatingSub, Zero};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
/// Build an object responsible for keeping track of a rolling total.
/// It does this in constant time and using memory proportional to the
@@ -8,7 +8,7 @@ use std::collections::VecDeque;
pub struct RollingTotalBuilder<Time: TimeSource = SystemTimeSource> {
window_size_multiple: usize,
granularity: Duration,
time_source: Time
time_source: Time,
}
impl RollingTotalBuilder {
@@ -19,7 +19,7 @@ impl RollingTotalBuilder {
Self {
window_size_multiple: 10,
granularity: Duration::from_secs(1),
time_source: SystemTimeSource
time_source: SystemTimeSource,
}
}
@@ -28,7 +28,7 @@ impl RollingTotalBuilder {
RollingTotalBuilder {
window_size_multiple: self.window_size_multiple,
granularity: self.granularity,
time_source: val
time_source: val,
}
}
@@ -52,11 +52,12 @@ impl RollingTotalBuilder {
}
}
impl <Time: TimeSource> RollingTotalBuilder<Time> {
impl<Time: TimeSource> RollingTotalBuilder<Time> {
/// Create a [`RollingTotal`] with these setings, starting from the
/// instant provided.
pub fn start<T>(self) -> RollingTotal<T, Time>
where T: Zero + SaturatingAdd + SaturatingSub
where
T: Zero + SaturatingAdd + SaturatingSub,
{
let mut averages = VecDeque::new();
averages.push_back((self.time_source.now(), T::zero()));
@@ -66,7 +67,7 @@ impl <Time: TimeSource> RollingTotalBuilder<Time> {
time_source: self.time_source,
granularity: self.granularity,
averages,
total: T::zero()
total: T::zero(),
}
}
}
@@ -76,21 +77,18 @@ pub struct RollingTotal<Val, Time = SystemTimeSource> {
time_source: Time,
granularity: Duration,
averages: VecDeque<(Instant, Val)>,
total: Val
total: Val,
}
impl <Val, Time: TimeSource> RollingTotal<Val, Time>
impl<Val, Time: TimeSource> RollingTotal<Val, Time>
where
Val: SaturatingAdd + SaturatingSub + Copy + std::fmt::Debug,
Time: TimeSource
Time: TimeSource,
{
/// Add a new value at some time.
pub fn push(&mut self, value: Val) {
let time = self.time_source.now();
let (last_time, last_val) = self.averages
.back_mut()
.expect("always 1 value");
let (last_time, last_val) = self.averages.back_mut().expect("always 1 value");
let since_last_nanos = time.duration_since(*last_time).as_nanos();
let granularity_nanos = self.granularity.as_nanos();
@@ -105,14 +103,16 @@ where
let steps = since_last_nanos / granularity_nanos;
// Create a new time this number of jumps forward, and push it.
let new_time = *last_time + Duration::from_nanos(granularity_nanos as u64) * steps as u32;
let new_time =
*last_time + Duration::from_nanos(granularity_nanos as u64) * steps as u32;
self.total = self.total.saturating_add(&value);
self.averages.push_back((new_time, value));
// Remove any old times/values no longer within our window size. If window_size_multiple
// is 1, then we only keep the just-pushed time, hence the "-1". Remember to keep our
// cached total up to date if we remove things.
let oldest_time_in_window = new_time - (self.granularity * (self.window_size_multiple - 1) as u32);
let oldest_time_in_window =
new_time - (self.granularity * (self.window_size_multiple - 1) as u32);
while self.averages.front().expect("always 1 value").0 < oldest_time_in_window {
let value = self.averages.pop_front().expect("always 1 value").1;
self.total = self.total.saturating_sub(&value);
@@ -123,7 +123,6 @@ where
*last_val = last_val.saturating_add(&value);
self.total = self.total.saturating_add(&value);
}
}
/// Fetch the current rolling total that we've accumulated. Note that this
@@ -191,13 +190,19 @@ mod test {
.time_source(UserTimeSource(start_time))
.start();
rolling_total.time_source().increment_by(Duration::from_millis(300));
rolling_total
.time_source()
.increment_by(Duration::from_millis(300));
rolling_total.push(1);
rolling_total.time_source().increment_by(Duration::from_millis(300));
rolling_total
.time_source()
.increment_by(Duration::from_millis(300));
rolling_total.push(10);
rolling_total.time_source().increment_by(Duration::from_millis(300));
rolling_total
.time_source()
.increment_by(Duration::from_millis(300));
rolling_total.push(-5);
assert_eq!(rolling_total.total(), 6);
@@ -218,13 +223,17 @@ mod test {
assert_eq!(rolling_total.averages().len(), 1);
assert_eq!(rolling_total.total(), 4);
rolling_total.time_source().increment_by(Duration::from_secs(3));
rolling_total
.time_source()
.increment_by(Duration::from_secs(3));
rolling_total.push(1);
assert_eq!(rolling_total.averages().len(), 2);
assert_eq!(rolling_total.total(), 5);
rolling_total.time_source().increment_by(Duration::from_secs(1));
rolling_total
.time_source()
.increment_by(Duration::from_secs(1));
rolling_total.push(10);
assert_eq!(rolling_total.averages().len(), 3);
@@ -233,25 +242,30 @@ mod test {
// Jump precisely to the end of the window. Now, pushing a
// value will displace the first one (4). Note: if no value
// is pushed, this time change will have no effect.
rolling_total.time_source().increment_by(Duration::from_secs(8));
rolling_total
.time_source()
.increment_by(Duration::from_secs(8));
rolling_total.push(20);
assert_eq!(rolling_total.averages().len(), 3);
assert_eq!(rolling_total.total(), 15 + 20 - 4);
// Jump so that only the last value is still within the window:
rolling_total.time_source().increment_by(Duration::from_secs(9));
rolling_total
.time_source()
.increment_by(Duration::from_secs(9));
rolling_total.push(1);
assert_eq!(rolling_total.averages().len(), 2);
assert_eq!(rolling_total.total(), 21);
// Jump so that everything is out of scope (just about!):
rolling_total.time_source().increment_by(Duration::from_secs(10));
rolling_total
.time_source()
.increment_by(Duration::from_secs(10));
rolling_total.push(1);
assert_eq!(rolling_total.averages().len(), 1);
assert_eq!(rolling_total.total(), 1);
}
}
}
+4 -4
View File
@@ -7,9 +7,9 @@ mod real_ip;
use std::{collections::HashSet, net::IpAddr, time::Duration};
use aggregator::{Aggregator, FromWebsocket};
use common::byte_size::ByteSize;
use common::http_utils;
use common::node_message;
use common::byte_size::ByteSize;
use common::rolling_total::RollingTotalBuilder;
use futures::{channel::mpsc, SinkExt, StreamExt};
use http::Uri;
@@ -54,7 +54,7 @@ struct Opts {
/// rolling window of 10 seconds, and so spikes beyond this limit are allowed as long as
/// the average traffic in the last 10 seconds falls below this value.
#[structopt(long, default_value = "512k")]
max_node_data_per_second: ByteSize
max_node_data_per_second: ByteSize,
}
#[tokio::main]
@@ -100,7 +100,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
ws_recv,
tx_to_aggregator,
max_nodes_per_connection,
bytes_per_second
bytes_per_second,
)
.await;
log::info!("Closing /submit connection from {:?}", addr);
@@ -130,7 +130,7 @@ async fn handle_node_websocket_connection<S>(
mut ws_recv: http_utils::WsReceiver,
mut tx_to_aggregator: S,
max_nodes_per_connection: usize,
bytes_per_second: ByteSize
bytes_per_second: ByteSize,
) -> (S, http_utils::WsSender)
where
S: futures::Sink<FromWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,