mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-06-09 20:21:01 +00:00
improve socket channel close handling, and test the node banning (roughly)
This commit is contained in:
@@ -232,7 +232,11 @@ mod test {
|
||||
// Regardless of the exact time that's elapsed, we'll end up with buckets that
|
||||
// are exactly granularity spacing (or multiples of) apart.
|
||||
assert_eq!(
|
||||
rolling_total.averages().into_iter().copied().collect::<Vec<_>>(),
|
||||
rolling_total
|
||||
.averages()
|
||||
.into_iter()
|
||||
.copied()
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
(start_time, 1),
|
||||
(start_time + granularity, 2),
|
||||
|
||||
@@ -63,20 +63,22 @@ impl Connection {
|
||||
|
||||
// Receive messages from the socket and post them out:
|
||||
let (mut tx_to_external, rx_from_ws) = mpsc::unbounded();
|
||||
let (tx_has_closed, mut rx_has_closed) = futures::channel::oneshot::channel();
|
||||
tokio::spawn(async move {
|
||||
let mut data = Vec::with_capacity(128);
|
||||
loop {
|
||||
// Clear the buffer and wait for the next message to arrive:
|
||||
data.clear();
|
||||
|
||||
let message_data = match ws_from_connection.receive_data(&mut data).await {
|
||||
Err(e) => {
|
||||
// Couldn't receive data may mean all senders are gone, so log
|
||||
// the error and shut this down:
|
||||
// Couldn't receive data means some issue with the connection. Log
|
||||
// the error, and close the other half of the connection too,
|
||||
// so the associated channels close gracefully.
|
||||
log::error!(
|
||||
"Shutting down websocket connection: Failed to receive data: {}",
|
||||
e
|
||||
);
|
||||
let _ = tx_has_closed.send(());
|
||||
break;
|
||||
}
|
||||
Ok(data) => data,
|
||||
@@ -93,11 +95,11 @@ impl Connection {
|
||||
|
||||
if let Err(e) = tx_to_external.send(msg).await {
|
||||
// Failure to send likely means that the recv has been dropped,
|
||||
// so let's drop this loop too.
|
||||
log::error!(
|
||||
"Shutting down websocket connection: Failed to send data out: {}",
|
||||
e
|
||||
);
|
||||
// so let's drop this loop too. An issue with the channel doesn't
|
||||
// mean that our socket connection has failed though, so we make no
|
||||
// attempt to close the other half of our connection here (we may
|
||||
// still be happily sending messages even if we dropped the receiver)
|
||||
log::error!("Failed to send data out: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -106,7 +108,18 @@ impl Connection {
|
||||
// Receive messages externally to send to the socket.
|
||||
let (tx_to_ws, mut rx_from_external) = mpsc::unbounded();
|
||||
tokio::spawn(async move {
|
||||
while let Some(msg) = rx_from_external.next().await {
|
||||
loop {
|
||||
let msg = tokio::select! {
|
||||
msg = rx_from_external.next() => { msg },
|
||||
// Websocket connection closed? Don't wait for incoming message; break immediately.
|
||||
_ = &mut rx_has_closed => { break },
|
||||
};
|
||||
|
||||
let msg = match msg {
|
||||
None => break,
|
||||
Some(msg) => msg,
|
||||
};
|
||||
|
||||
match msg {
|
||||
SentMessageInternal::Message(SentMessage::Text(s)) => {
|
||||
if let Err(e) = ws_to_connection.send_text_owned(s).await {
|
||||
|
||||
@@ -56,7 +56,7 @@ impl Sender {
|
||||
Ok(())
|
||||
}
|
||||
/// Returns whether this channel is closed.
|
||||
pub fn is_closed(&mut self) -> bool {
|
||||
pub fn is_closed(&self) -> bool {
|
||||
self.inner.is_closed()
|
||||
}
|
||||
/// Unbounded send will always queue the message and doesn't
|
||||
|
||||
@@ -14,9 +14,24 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
//! General end-to-end tests
|
||||
/*!
|
||||
General end-to-end tests
|
||||
|
||||
Note that on MacOS inparticular, you may need to increase some limits to be
|
||||
able to open a large number of connections and run some of the tests.
|
||||
Try running these:
|
||||
|
||||
```sh
|
||||
sudo sysctl -w kern.maxfiles=50000
|
||||
sudo sysctl -w kern.maxfilesperproc=50000
|
||||
ulimit -n 50000
|
||||
sudo sysctl -w kern.ipc.somaxconn=50000
|
||||
sudo sysctl -w kern.ipc.maxsockbuf=16777216
|
||||
```
|
||||
*/
|
||||
|
||||
use common::node_types::BlockHash;
|
||||
use common::ws_client::SentMessage;
|
||||
use serde_json::json;
|
||||
use std::time::Duration;
|
||||
use test_utils::{
|
||||
@@ -506,6 +521,57 @@ async fn feed_can_subscribe_and_unsubscribe_from_chain() {
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
/// If a node sends more than some rolling average amount of data, it'll be booted.
|
||||
#[tokio::test]
|
||||
async fn node_banned_if_it_sends_too_much_data() {
|
||||
async fn try_send_data(max_bytes: usize, send_msgs: usize, bytes_per_msg: usize) -> bool {
|
||||
let mut server = start_server(
|
||||
false,
|
||||
CoreOpts::default(),
|
||||
ShardOpts {
|
||||
// Remember, this is (currently) averaged over the last 10 seconds,
|
||||
// so we need to send 10x this amount of data for an imemdiate ban:
|
||||
max_node_data_per_second: Some(max_bytes),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
// Give us a shard to talk to:
|
||||
let shard_id = server.add_shard().await.unwrap();
|
||||
let (node_tx, _node_rx) = server
|
||||
.get_shard(shard_id)
|
||||
.unwrap()
|
||||
.connect_node()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Send the data requested to the shard:
|
||||
for _ in 0..send_msgs {
|
||||
node_tx
|
||||
.unbounded_send(SentMessage::Binary(vec![1; bytes_per_msg]))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Wait a little for the shard to react and cut off the connection (or not):
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
|
||||
// Has the connection been closed?
|
||||
node_tx.is_closed()
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
try_send_data(1000, 10, 1000).await,
|
||||
false,
|
||||
"shouldn't be closed; we didn't exceed 10x threshold"
|
||||
);
|
||||
assert_eq!(
|
||||
try_send_data(999, 10, 1000).await,
|
||||
true,
|
||||
"should be closed; we sent just over 10x the block threshold"
|
||||
);
|
||||
}
|
||||
|
||||
/// Feeds will be disconnected if they can't receive messages quickly enough.
|
||||
#[tokio::test]
|
||||
async fn slow_feeds_are_disconnected() {
|
||||
|
||||
@@ -15,9 +15,9 @@
|
||||
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::net::IpAddr;
|
||||
use std::sync::{ Mutex, Arc };
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// Keep track of nodes that have been blocked.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -26,7 +26,7 @@ pub struct BlockedAddrs(Arc<BlockAddrsInner>);
|
||||
#[derive(Debug)]
|
||||
struct BlockAddrsInner {
|
||||
block_duration: Duration,
|
||||
inner: Mutex<HashMap<IpAddr, (&'static str, Instant)>>
|
||||
inner: Mutex<HashMap<IpAddr, (&'static str, Instant)>>,
|
||||
}
|
||||
|
||||
impl BlockedAddrs {
|
||||
@@ -35,7 +35,7 @@ impl BlockedAddrs {
|
||||
pub fn new(block_duration: Duration) -> BlockedAddrs {
|
||||
BlockedAddrs(Arc::new(BlockAddrsInner {
|
||||
block_duration,
|
||||
inner: Mutex::new(HashMap::new())
|
||||
inner: Mutex::new(HashMap::new()),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -52,8 +52,8 @@ impl BlockedAddrs {
|
||||
let mut map = self.0.inner.lock().unwrap();
|
||||
|
||||
let (reason, time) = match map.get(addr) {
|
||||
Some(&(reason,time)) => (reason, time),
|
||||
None => return None
|
||||
Some(&(reason, time)) => (reason, time),
|
||||
None => return None,
|
||||
};
|
||||
|
||||
if time + self.0.block_duration < Instant::now() {
|
||||
@@ -63,4 +63,4 @@ impl BlockedAddrs {
|
||||
Some(reason)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,14 +16,15 @@
|
||||
|
||||
#[warn(missing_docs)]
|
||||
mod aggregator;
|
||||
mod blocked_addrs;
|
||||
mod connection;
|
||||
mod json_message;
|
||||
mod real_ip;
|
||||
mod blocked_addrs;
|
||||
|
||||
use std::{collections::HashSet, net::IpAddr, time::Duration};
|
||||
|
||||
use aggregator::{Aggregator, FromWebsocket};
|
||||
use blocked_addrs::BlockedAddrs;
|
||||
use common::byte_size::ByteSize;
|
||||
use common::http_utils;
|
||||
use common::node_message;
|
||||
@@ -33,7 +34,6 @@ use http::Uri;
|
||||
use hyper::{Method, Response};
|
||||
use simple_logger::SimpleLogger;
|
||||
use structopt::StructOpt;
|
||||
use blocked_addrs::BlockedAddrs;
|
||||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
|
||||
@@ -79,7 +79,7 @@ struct Opts {
|
||||
/// How many seconds is a "/feed" connection that violates the '--max-node-data-per-second'
|
||||
/// value prevented from reconnecting to this shard for, in seconds.
|
||||
#[structopt(long, default_value = "600")]
|
||||
node_block_seconds: u64
|
||||
node_block_seconds: u64,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -118,10 +118,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
let real_addr = real_ip::real_ip(addr, req.headers());
|
||||
|
||||
if let Some(reason) = block_list.blocked_reason(&real_addr) {
|
||||
return Ok(Response::builder()
|
||||
.status(403)
|
||||
.body(reason.into())
|
||||
.unwrap())
|
||||
return Ok(Response::builder().status(403).body(reason.into()).unwrap());
|
||||
}
|
||||
|
||||
Ok(http_utils::upgrade_to_websocket(
|
||||
@@ -136,7 +133,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
tx_to_aggregator,
|
||||
max_nodes_per_connection,
|
||||
bytes_per_second,
|
||||
block_list
|
||||
block_list,
|
||||
)
|
||||
.await;
|
||||
log::info!("Closing /submit connection from {:?}", addr);
|
||||
@@ -167,7 +164,7 @@ async fn handle_node_websocket_connection<S>(
|
||||
mut tx_to_aggregator: S,
|
||||
max_nodes_per_connection: usize,
|
||||
bytes_per_second: ByteSize,
|
||||
block_list: BlockedAddrs
|
||||
block_list: BlockedAddrs,
|
||||
) -> (S, http_utils::WsSender)
|
||||
where
|
||||
S: futures::Sink<FromWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
|
||||
|
||||
@@ -40,7 +40,7 @@ impl Default for ShardOpts {
|
||||
Self {
|
||||
max_nodes_per_connection: None,
|
||||
max_node_data_per_second: None,
|
||||
node_block_seconds: None
|
||||
node_block_seconds: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -125,9 +125,7 @@ pub async fn start_server(
|
||||
|
||||
// Append additional opts to the core command
|
||||
if let Some(val) = core_opts.feed_timeout {
|
||||
core_command = core_command
|
||||
.arg("--feed-timeout")
|
||||
.arg(val.to_string());
|
||||
core_command = core_command.arg("--feed-timeout").arg(val.to_string());
|
||||
}
|
||||
|
||||
// Star the server
|
||||
|
||||
Reference in New Issue
Block a user