Telemetry refactor (#757)

* Telemetry Refactor

* Cleanup

* Sleep after clearing Sender on error
This commit is contained in:
Maciej Hirsz
2018-09-17 18:43:47 +02:00
committed by Gav Wood
parent 597ea41cd4
commit 263786df05
3 changed files with 127 additions and 96 deletions
+82 -63
View File
@@ -24,7 +24,7 @@
// end::description[]
extern crate parking_lot;
extern crate websocket as ws;
extern crate ws;
extern crate slog_async;
extern crate slog_json;
#[macro_use]
@@ -33,7 +33,8 @@ extern crate log;
extern crate slog;
extern crate slog_scope;
use std::{io, time};
use std::{io, time, thread};
use std::sync::Arc;
use parking_lot::Mutex;
use slog::Drain;
pub use slog_scope::with_logger;
@@ -54,17 +55,9 @@ const CHANNEL_SIZE: usize = 262144;
/// Initialise telemetry.
pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard {
let client = ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok());
let log = slog::Logger::root(
slog_async::Async::new(
slog_json::Json::default(
TelemetryWriter {
buffer: vec![],
out: Mutex::new(client),
config,
last_time: None, // ensures that on_connect will be called.
}
).fuse()
slog_json::Json::default(TelemetryWriter::new(config)).fuse()
).chan_size(CHANNEL_SIZE)
.overflow_strategy(slog_async::OverflowStrategy::DropAndReport)
.build().fuse(), o!()
@@ -78,81 +71,107 @@ macro_rules! telemetry {
( $($t:tt)* ) => { $crate::with_logger(|l| slog_info!(l, $($t)* )) }
}
struct TelemetryWriter {
buffer: Vec<u8>,
out: Mutex<Option<ws::sync::Client<Box<ws::stream::sync::NetworkStream + Send>>>>,
config: TelemetryConfig,
last_time: Option<time::Instant>,
struct Connection<'a> {
out: ws::Sender,
out_sync: &'a Mutex<Option<ws::Sender>>,
config: &'a TelemetryConfig,
}
/// Every two minutes we reconnect to the telemetry server otherwise we don't get notified
/// of a flakey connection that has been dropped and needs to be reconnected. We can remove
/// this once we introduce a keepalive ping/pong.
const RECONNECT_PERIOD: u64 = 120;
impl<'a> Connection<'a> {
fn new(out: ws::Sender, out_sync: &'a Mutex<Option<ws::Sender>>, config: &'a TelemetryConfig) -> Self {
Connection {
out,
out_sync,
config,
}
}
}
impl<'a> ws::Handler for Connection<'a> {
fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> {
trace!(target: "telemetry", "Connected!");
*self.out_sync.lock() = Some(self.out.clone());
(self.config.on_connect)();
Ok(())
}
fn on_close(&mut self, code: ws::CloseCode, reason: &str) {
*self.out_sync.lock() = None;
trace!(target: "telemetry", "Connection closing due to ({:?}) {}", code, reason);
}
fn on_error(&mut self, _: ws::Error) {
*self.out_sync.lock() = None;
// Sleep to ensure that reconnecting isn't spamming logs.
// This happens in it's own thread so it won't block anything.
thread::sleep(time::Duration::from_millis(1000));
}
}
struct TelemetryWriter {
buffer: Vec<u8>,
out: Arc<Mutex<Option<ws::Sender>>>,
}
impl TelemetryWriter {
fn ensure_connected(&mut self) {
let mut client = self.out.lock();
fn new(config: TelemetryConfig) -> Self {
let out_sync = Arc::new(Mutex::new(None));
let out = out_sync.clone();
let controlled_disconnect = if let Some(t) = self.last_time {
if t.elapsed().as_secs() > RECONNECT_PERIOD && client.is_some() {
trace!(target: "telemetry", "Performing controlled drop of the telemetry connection.");
let _ = client.as_mut().and_then(|socket|
socket.send_message(&ws::Message::text("{\"msg\":\"system.reconnect\"}")).ok()
);
*client = None;
true
} else {
false
}
} else {
false
};
thread::spawn(move || {
loop {
trace!(target: "telemetry", "Connecting to Telemetry...");
let just_connected = if client.is_none() {
if !controlled_disconnect {
info!(target: "telemetry", "Connection dropped unexpectedly. Reconnecting to telemetry server...");
let _ = ws::connect(config.url.as_str(), |out| Connection::new(out, &*out_sync, &config));
}
*client = ws::ClientBuilder::new(&self.config.url).ok().and_then(|mut x| x.connect(None).ok());
client.is_some()
} else {
self.last_time.is_none()
};
});
drop(client);
if just_connected {
if !controlled_disconnect {
info!("Reconnected to telemetry server: {}", self.config.url);
}
self.last_time = Some(time::Instant::now());
(self.config.on_connect)();
TelemetryWriter {
buffer: Vec::new(),
out,
}
}
}
impl io::Write for TelemetryWriter {
fn write(&mut self, msg: &[u8]) -> io::Result<usize> {
if msg.iter().any(|x| *x == b'\n') {
let mut iter = msg.split(|x| *x == b'\n');
let first = iter.next().expect("Split iterator always has at least one element; qed");
self.buffer.extend_from_slice(first);
// Flush for each occurrence of new line character
for continued in iter {
let _ = self.flush();
} else {
self.buffer.extend_from_slice(msg);
self.buffer.extend_from_slice(continued);
}
Ok(msg.len())
}
fn flush(&mut self) -> io::Result<()> {
self.ensure_connected();
if self.buffer.is_empty() {
return Ok(());
}
if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) {
let mut out = self.out.lock();
let mut l = self.out.lock();
let socket_closed = if let Some(ref mut socket) = *l {
if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) {
let r = socket.send_message(&ws::Message::text(s));
let error = if let Some(ref mut o) = *out {
let r = o.send(s);
trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r);
r.is_err()
} else { false }
} else { false };
if socket_closed {
*l = None;
} else {
trace!(target: "telemetry", "Telemetry socket closed, failed to send: {}", s);
false
};
if error {
*out = None;
}
}
self.buffer.clear();
Ok(())