mirror of
https://github.com/pezkuwichain/pezkuwi-telemetry.git
synced 2026-04-29 23:07:59 +00:00
Tidy up stale connections. (#406)
* If messageId changes and network ID doesn't, remove 'old' message_id * Boot nodes/connection when no recent messages received for it * Separate task needed for soketto recv to avoid cancel-safety issues with new interval * Wee tidy up * cargo fmt * Add some logging around node adding/removing * Another log info msg * a bit of tidy up * bump stale node timeout to 60s
This commit is contained in:
Generated
+4
@@ -31,6 +31,9 @@ name = "arrayvec"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "be4dc07131ffa69b8072d35f5007352af944213cde02545e2103680baed38fcd"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
@@ -198,6 +201,7 @@ name = "common"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrayvec",
|
||||
"base64",
|
||||
"bimap",
|
||||
"bincode",
|
||||
|
||||
@@ -28,6 +28,7 @@ soketto = "0.6.0"
|
||||
thiserror = "1.0.24"
|
||||
tokio = { version = "1.8.2", features = ["full"] }
|
||||
tokio-util = { version = "0.6", features = ["compat"] }
|
||||
arrayvec = { version = "0.7.1", features = ["serde"] }
|
||||
|
||||
[dev-dependencies]
|
||||
bincode = "1.3.3"
|
||||
|
||||
@@ -141,6 +141,7 @@ impl Payload {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use arrayvec::ArrayString;
|
||||
use bincode::Options;
|
||||
|
||||
// Without adding a derive macro and marker trait (and enforcing their use), we don't really
|
||||
@@ -166,7 +167,7 @@ mod tests {
|
||||
implementation: "foo".into(),
|
||||
version: "foo".into(),
|
||||
validator: None,
|
||||
network_id: None,
|
||||
network_id: ArrayString::new(),
|
||||
startup_time: None,
|
||||
},
|
||||
}),
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
//! These types are partly used in [`crate::node_message`], but also stored and used
|
||||
//! more generally through the application.
|
||||
|
||||
use arrayvec::ArrayString;
|
||||
use serde::ser::{SerializeTuple, Serializer};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -25,6 +26,7 @@ use crate::{time, MeanList};
|
||||
pub type BlockNumber = u64;
|
||||
pub type Timestamp = u64;
|
||||
pub use primitive_types::H256 as BlockHash;
|
||||
pub type NetworkId = ArrayString<64>;
|
||||
|
||||
/// Basic node details.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
@@ -34,7 +36,7 @@ pub struct NodeDetails {
|
||||
pub implementation: Box<str>,
|
||||
pub version: Box<str>,
|
||||
pub validator: Option<Box<str>>,
|
||||
pub network_id: Option<Box<str>>,
|
||||
pub network_id: NetworkId,
|
||||
pub startup_time: Option<Box<str>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -274,6 +274,7 @@ impl<'a> StateChain<'a> {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use common::node_types::NetworkId;
|
||||
|
||||
fn node(name: &str, chain: &str) -> NodeDetails {
|
||||
NodeDetails {
|
||||
@@ -282,7 +283,7 @@ mod test {
|
||||
implementation: "Bar".into(),
|
||||
version: "0.1".into(),
|
||||
validator: None,
|
||||
network_id: None,
|
||||
network_id: NetworkId::new(),
|
||||
startup_time: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -657,7 +657,10 @@ async fn e2e_slow_feeds_are_disconnected() {
|
||||
let (mut raw_feed_tx, mut raw_feed_rx) = server.get_core().connect_feed_raw().await.unwrap();
|
||||
|
||||
// Subscribe the feed:
|
||||
raw_feed_tx.send_text("subscribe:Polkadot").await.unwrap();
|
||||
raw_feed_tx
|
||||
.send_text("subscribe:0x0000000000000000000000000000000000000000000000000000000000000001")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Wait a little.. the feed hasn't been receiving messages so it should
|
||||
// be booted after ~a second.
|
||||
|
||||
@@ -73,6 +73,10 @@ pub enum FromWebsocket {
|
||||
message_id: node_message::NodeMessageId,
|
||||
payload: node_message::Payload,
|
||||
},
|
||||
/// remove a node with the given message ID
|
||||
Remove {
|
||||
message_id: node_message::NodeMessageId,
|
||||
},
|
||||
/// Make a note when the node disconnects.
|
||||
Disconnected,
|
||||
}
|
||||
@@ -247,6 +251,20 @@ impl Aggregator {
|
||||
.send_async(FromShardAggregator::UpdateNode { local_id, payload })
|
||||
.await;
|
||||
}
|
||||
ToAggregator::FromWebsocket(conn_id, FromWebsocket::Remove { message_id }) => {
|
||||
// Get the local ID, ignoring the message if none match:
|
||||
let local_id = match to_local_id.get_id(&(conn_id, message_id)) {
|
||||
Some(id) => id,
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// Remove references to this single node:
|
||||
to_local_id.remove_by_id(local_id);
|
||||
muted.remove(&local_id);
|
||||
let _ = tx_to_telemetry_core
|
||||
.send_async(FromShardAggregator::RemoveNode { local_id })
|
||||
.await;
|
||||
}
|
||||
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
|
||||
// Find all of the local IDs corresponding to the disconnected connection ID and
|
||||
// remove them, telling Telemetry Core about them too. This could be more efficient,
|
||||
|
||||
@@ -246,7 +246,7 @@ pub struct NodeDetails {
|
||||
pub implementation: Box<str>,
|
||||
pub version: Box<str>,
|
||||
pub validator: Option<Box<str>>,
|
||||
pub network_id: Option<Box<str>>,
|
||||
pub network_id: node_types::NetworkId,
|
||||
pub startup_time: Option<Box<str>>,
|
||||
}
|
||||
|
||||
|
||||
@@ -21,15 +21,20 @@ mod connection;
|
||||
mod json_message;
|
||||
mod real_ip;
|
||||
|
||||
use std::{collections::HashSet, net::IpAddr, time::Duration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::IpAddr,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use aggregator::{Aggregator, FromWebsocket};
|
||||
use blocked_addrs::BlockedAddrs;
|
||||
use common::byte_size::ByteSize;
|
||||
use common::http_utils;
|
||||
use common::node_message;
|
||||
use common::node_message::NodeMessageId;
|
||||
use common::rolling_total::RollingTotalBuilder;
|
||||
use futures::SinkExt;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use http::Uri;
|
||||
use hyper::{Method, Response};
|
||||
use simple_logger::SimpleLogger;
|
||||
@@ -91,6 +96,12 @@ struct Opts {
|
||||
/// on the machine. If no value is given, use an internal default that we have deemed sane.
|
||||
#[structopt(long)]
|
||||
worker_threads: Option<usize>,
|
||||
/// Roughly how long to wait in seconds for new telemetry data to arrive from a node. If
|
||||
/// telemetry for a node does not arrive in this time frame, we remove the corresponding node
|
||||
/// state, and if no messages are received on the connection at all in this time, it will be
|
||||
/// dropped.
|
||||
#[structopt(long, default_value = "60")]
|
||||
stale_node_timeout: u64,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@@ -131,6 +142,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
let socket_addr = opts.socket;
|
||||
let max_nodes_per_connection = opts.max_nodes_per_connection;
|
||||
let bytes_per_second = opts.max_node_data_per_second;
|
||||
let stale_node_timeout = Duration::from_secs(opts.stale_node_timeout);
|
||||
|
||||
let server = http_utils::start_server(socket_addr, move |addr, req| {
|
||||
let aggregator = aggregator.clone();
|
||||
@@ -165,6 +177,7 @@ async fn start_server(opts: Opts) -> anyhow::Result<()> {
|
||||
max_nodes_per_connection,
|
||||
bytes_per_second,
|
||||
block_list,
|
||||
stale_node_timeout,
|
||||
)
|
||||
.await;
|
||||
log::info!(
|
||||
@@ -200,10 +213,15 @@ async fn handle_node_websocket_connection<S>(
|
||||
max_nodes_per_connection: usize,
|
||||
bytes_per_second: ByteSize,
|
||||
block_list: BlockedAddrs,
|
||||
stale_node_timeout: Duration,
|
||||
) -> (S, http_utils::WsSender)
|
||||
where
|
||||
S: futures::Sink<FromWebsocket, Error = anyhow::Error> + Unpin + Send + 'static,
|
||||
{
|
||||
// Keep track of the message Ids that have been "granted access". We allow a maximum of
|
||||
// `max_nodes_per_connection` before ignoring others.
|
||||
let mut allowed_message_ids = HashMap::<NodeMessageId, Instant>::new();
|
||||
|
||||
// Limit the number of bytes based on a rolling total and the incoming bytes per second
|
||||
// that has been configured via the CLI opts.
|
||||
let bytes_per_second = bytes_per_second.num_bytes();
|
||||
@@ -212,46 +230,86 @@ where
|
||||
.window_size_multiple(10)
|
||||
.start();
|
||||
|
||||
// Track all of the message IDs that we've seen so far. If we exceed the
|
||||
// max_nodes_per_connection limit we ignore subsequent message IDs.
|
||||
let mut message_ids_seen = HashSet::new();
|
||||
|
||||
// This could be a oneshot channel, but it's useful to be able to clone
|
||||
// messages, and we can't clone oneshot channel senders.
|
||||
let (close_connection_tx, close_connection_rx) = flume::bounded(1);
|
||||
|
||||
// Tell the aggregator about this new connection, and give it a way to close this connection:
|
||||
let init_msg = FromWebsocket::Initialize {
|
||||
close_connection: close_connection_tx,
|
||||
close_connection: close_connection_tx.clone(),
|
||||
};
|
||||
if let Err(e) = tx_to_aggregator.send(init_msg).await {
|
||||
log::error!("Error sending message to aggregator: {}", e);
|
||||
return (tx_to_aggregator, ws_send);
|
||||
}
|
||||
|
||||
// Now we've "initialized", wait for messages from the node. Messages will
|
||||
// either be `SystemConnected` type messages that inform us that a new set
|
||||
// of messages with some message ID will be sent (a node could have more
|
||||
// than one of these), or updates linked to a specific message_id.
|
||||
// Receiving data isn't cancel safe, so let it happen in a separate task.
|
||||
// If this loop ends, the outer will receive a `None` message and end too.
|
||||
// If the outer loop ends, it fires a msg on `close_connection_rx` to ensure this ends too.
|
||||
let (ws_tx_atomic, mut ws_rx_atomic) = futures::channel::mpsc::unbounded();
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
let mut bytes = Vec::new();
|
||||
tokio::select! {
|
||||
// The close channel has fired, so end the loop. `ws_recv.receive_data` is
|
||||
// *not* cancel safe, but since we're closing the connection we don't care.
|
||||
_ = close_connection_rx.recv_async() => {
|
||||
log::info!("connection to {:?} being closed", real_addr);
|
||||
break
|
||||
},
|
||||
// Receive data and relay it on to our main select loop below.
|
||||
msg_info = ws_recv.receive_data(&mut bytes) => {
|
||||
if let Err(soketto::connection::Error::Closed) = msg_info {
|
||||
break;
|
||||
}
|
||||
if let Err(e) = msg_info {
|
||||
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
|
||||
break;
|
||||
}
|
||||
if ws_tx_atomic.unbounded_send(bytes).is_err() {
|
||||
// The other end closed; end this loop.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// A periodic interval to check for stale nodes.
|
||||
let mut stale_interval = tokio::time::interval(stale_node_timeout / 2);
|
||||
|
||||
// Our main select loop atomically receives and handles telemetry messages from the node,
|
||||
// and periodically checks for stale connections to keep our ndoe state tidy.
|
||||
loop {
|
||||
let mut bytes = Vec::new();
|
||||
tokio::select! {
|
||||
// The close channel has fired, so end the loop. `ws_recv.receive_data` is
|
||||
// *not* cancel safe, but since we're closing the connection we don't care.
|
||||
_ = close_connection_rx.recv_async() => {
|
||||
log::info!("connection to {:?} being closed by aggregator", real_addr);
|
||||
break
|
||||
// We periodically check for stale message IDs and remove nodes associated with
|
||||
// them, to prevent a buildup. We boot the whole connection if no interpretable
|
||||
// messages have been sent at all in the time period.
|
||||
_ = stale_interval.tick() => {
|
||||
let stale_ids: Vec<NodeMessageId> = allowed_message_ids.iter()
|
||||
.filter(|(_, last_seen)| last_seen.elapsed() > stale_node_timeout)
|
||||
.map(|(&id, _)| id)
|
||||
.collect();
|
||||
|
||||
for &message_id in &stale_ids {
|
||||
log::info!("Removing stale node with message ID {} from {:?}", message_id, real_addr);
|
||||
allowed_message_ids.remove(&message_id);
|
||||
let _ = tx_to_aggregator.send(FromWebsocket::Remove { message_id } ).await;
|
||||
}
|
||||
|
||||
if !stale_ids.is_empty() && allowed_message_ids.is_empty() {
|
||||
// End the entire connection if no recent messages came in for any ID.
|
||||
log::info!("Closing stale connection from {:?}", real_addr);
|
||||
break;
|
||||
}
|
||||
},
|
||||
// A message was received; handle it:
|
||||
msg_info = ws_recv.receive_data(&mut bytes) => {
|
||||
// Handle the socket closing, or errors receiving the message.
|
||||
if let Err(soketto::connection::Error::Closed) = msg_info {
|
||||
break;
|
||||
}
|
||||
if let Err(e) = msg_info {
|
||||
log::error!("Shutting down websocket connection: Failed to receive data: {}", e);
|
||||
break;
|
||||
}
|
||||
// Handle messages received by the connected node.
|
||||
msg = ws_rx_atomic.next() => {
|
||||
// No more messages? break.
|
||||
let bytes = match msg {
|
||||
Some(bytes) => bytes,
|
||||
None => { break; }
|
||||
};
|
||||
|
||||
// Keep track of total bytes and bail if average over last 10 secs exceeds preference.
|
||||
rolling_total_bytes.push(bytes.len());
|
||||
@@ -283,21 +341,21 @@ where
|
||||
let message_id = node_message.id();
|
||||
let payload = node_message.into_payload();
|
||||
|
||||
// Ignore messages from IDs that exceed our limit:
|
||||
if message_ids_seen.contains(&message_id) {
|
||||
// continue on; we're happy
|
||||
} else if message_ids_seen.len() >= max_nodes_per_connection {
|
||||
// ignore this message; it's not a "seen" ID and we've hit our limit.
|
||||
continue;
|
||||
} else {
|
||||
// not seen ID, not hit limit; make note of new ID
|
||||
message_ids_seen.insert(message_id);
|
||||
}
|
||||
|
||||
// Until the aggregator receives an `Add` message, which we can create once
|
||||
// we see one of these SystemConnected ones, it will ignore messages with
|
||||
// the corresponding message_id.
|
||||
if let node_message::Payload::SystemConnected(info) = payload {
|
||||
// Too many nodes seen on this connection? Ignore this one.
|
||||
if allowed_message_ids.len() >= max_nodes_per_connection {
|
||||
log::info!("Ignoring new node from {:?} (we've hit the max of {} nodes per connection)", real_addr, max_nodes_per_connection);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Note of the message ID, allowing telemetry for it.
|
||||
allowed_message_ids.insert(message_id, Instant::now());
|
||||
|
||||
// Tell the aggregator loop about the new node.
|
||||
log::info!("Adding node with message ID {} from {:?}", message_id, real_addr);
|
||||
let _ = tx_to_aggregator.send(FromWebsocket::Add {
|
||||
message_id,
|
||||
ip: real_addr,
|
||||
@@ -307,14 +365,22 @@ where
|
||||
}
|
||||
// Anything that's not an "Add" is an Update. The aggregator will ignore
|
||||
// updates against a message_id that hasn't first been Added, above.
|
||||
else if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await {
|
||||
log::error!("Failed to send node message to aggregator: {}", e);
|
||||
continue;
|
||||
else {
|
||||
if let Some(last_seen) = allowed_message_ids.get_mut(&message_id) {
|
||||
*last_seen = Instant::now();
|
||||
if let Err(e) = tx_to_aggregator.send(FromWebsocket::Update { message_id, payload } ).await {
|
||||
log::error!("Failed to send node message to aggregator: {}", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure to kill off the receive-messages task if the main select loop ends:
|
||||
let _ = close_connection_tx.send(());
|
||||
|
||||
// Return what we need to close the connection gracefully:
|
||||
(tx_to_aggregator, ws_send)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user