diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 59c8980d14..94d981dab9 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -111,14 +111,6 @@ name = "base58" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "base64" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "byteorder 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "base64" version = "0.6.0" @@ -896,6 +888,11 @@ name = "lazycell" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "lazycell" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "libc" version = "0.2.43" @@ -1324,6 +1321,17 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "mio-extras" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazycell 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "mio-uds" version = "0.6.6" @@ -2210,6 +2218,11 @@ name = "sha1" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "sha1" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "sha2" version = "0.7.1" @@ -3041,7 +3054,7 @@ dependencies = [ "slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-json 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "slog-scope 4.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "websocket 0.20.2 (registry+https://github.com/rust-lang/crates.io-index)", + "ws 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3636,27 +3649,6 @@ dependencies = [ "parity-wasm 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "websocket" -version = "0.20.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "base64 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", - "bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", - "byteorder 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", - "hyper 0.10.13 (registry+https://github.com/rust-lang/crates.io-index)", - "native-tls 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", - "sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "websocket" version = "0.20.3" @@ -3723,6 +3715,24 @@ dependencies = [ "url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "ws" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "openssl 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" @@ -3771,7 +3781,6 @@ dependencies = [ "checksum backtrace 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "dbdd17cd962b570302f5297aea8648d5923e22e555c2ed2d8b2e34eca646bf6d" "checksum backtrace-sys 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)" = "bff67d0c06556c0b8e6b5f090f0eac52d950d9dfd1d35ba04e4ca3543eaf6a7e" "checksum base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5024ee8015f02155eee35c711107ddd9a9bf3cb689cf2a9089c97e79b6e1ae83" -"checksum base64 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "30e93c03064e7590d0466209155251b90c22e37fab1daf2771582598b5827557" "checksum base64 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96434f987501f0ed4eb336a411e0631ecd1afa11574fe148587adc4ff96143c9" "checksum base64 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5032d51da2741729bfdaeb2664d9b8c6d9fd1e2b90715c660b6def36628499c2" "checksum base64 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9263aa6a38da271eec5c91a83ce1e800f093c8535788d403d626d8d5c3f8f007" @@ -3866,6 +3875,7 @@ dependencies = [ "checksum lazy_static 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "76f033c7ad61445c5b347c7382dd1237847eb1bce590fe50365dcb33d546be73" "checksum lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e6412c5e2ad9584b0b8e979393122026cdd6d2a80b933f890dcd694ddbe73739" "checksum lazycell 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a6f08839bc70ef4a3fe1d566d5350f519c5912ea86be0df1740a7d247c7fc0ef" +"checksum lazycell 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e26d4c411b39f0afcf2ba6fe502be90e6c9b299c952dbd86124782520a13cffd" "checksum libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)" = "76e3a3ef172f1a0b9a9ff0dd1491ae5e6c948b94479a3021819ba7d860c8645d" "checksum libp2p 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=5980a4538ef6fc8af450893acb01290eaed136de)" = "" "checksum libp2p-core 0.1.0 (git+https://github.com/libp2p/rust-libp2p?rev=5980a4538ef6fc8af450893acb01290eaed136de)" = "" @@ -3899,6 +3909,7 @@ dependencies = [ "checksum mime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0" "checksum mime 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "0b28683d0b09bbc20be1c9b3f6f24854efb1356ffcffee08ea3f6e65596e85fa" "checksum mio 0.6.14 (registry+https://github.com/rust-lang/crates.io-index)" = "6d771e3ef92d58a8da8df7d6976bfca9371ed1de6619d9d5a5ce5b1f29b85bfe" +"checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40" "checksum mio-uds 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "84c7b5caa3a118a6e34dbac36504503b1e8dc5835e833306b9d6af0e05929f79" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" "checksum multiaddr 0.3.0 (git+https://github.com/libp2p/rust-libp2p?rev=5980a4538ef6fc8af450893acb01290eaed136de)" = "" @@ -3983,6 +3994,7 @@ dependencies = [ "checksum serde_json 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c6908c7b925cd6c590358a4034de93dbddb20c45e1d021931459fd419bf0e2" "checksum sha1 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cc30b1e1e8c40c121ca33b86c23308a090d19974ef001b4bf6e61fd1a0fb095c" "checksum sha1 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "171698ce4ec7cbb93babeb3190021b4d72e96ccb98e33d277ae4ea959d6f2d9e" +"checksum sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2579985fda508104f7587689507983eadd6a6e84dd35d6d115361f530916fa0d" "checksum sha2 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9eb6be24e4c23a84d7184280d2722f7f2731fcdd4a9d886efbfe4413e4847ea0" "checksum shell32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9ee04b46101f57121c9da2b151988283b6beb79b34f5bb29a58ee48cb695122c" "checksum skeptic 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24ebf8a06f5f8bae61ae5bbc7af7aac4ef6907ae975130faba1199e5fe82256a" @@ -4061,7 +4073,6 @@ dependencies = [ "checksum wabt-sys 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ca77c6b934a2b32618941b2f565aac43b8cb7141378c3b4fba4d8fcdcd57da3" "checksum want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a05d9d966753fa4b5c8db73fcab5eed4549cfe0e1e4e66911e5564a0085c35d1" "checksum wasmi 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "522fe3fdd44a56f25cd5ddcd8ccdb1cf2e982ceb28fcb00f41d8a018ae5245a8" -"checksum websocket 0.20.2 (registry+https://github.com/rust-lang/crates.io-index)" = "eb277e7f4c23dc49176f74ae200e77651764efb2c25f56ad2d22623b63826369" "checksum websocket 0.20.3 (git+https://github.com/tomaka/rust-websocket?branch=send)" = "" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "04e3bd221fcbe8a271359c04f21a76db7d0c6028862d1bb5512d85e1e2eb5bb3" @@ -4069,6 +4080,7 @@ dependencies = [ "checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" "checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" "checksum ws 0.7.5 (git+https://github.com/tomusdrw/ws-rs)" = "" +"checksum ws 0.7.8 (registry+https://github.com/rust-lang/crates.io-index)" = "d2c221321dca56e6a80aa179d562e1fbe6ae116aeaa9205c76fa64e9e3c49dfc" "checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" "checksum xdg 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a66b7c2281ebde13cf4391d70d4c7e5946c3c25e72a7b859ca8f677dcd0b0c61" "checksum yaml-rust 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992" diff --git a/substrate/core/telemetry/Cargo.toml b/substrate/core/telemetry/Cargo.toml index 2e4e8db6d6..0298707b63 100644 --- a/substrate/core/telemetry/Cargo.toml +++ b/substrate/core/telemetry/Cargo.toml @@ -12,4 +12,4 @@ slog = "^2" slog-json = "^2" slog-async = "^2" slog-scope = "^4" -websocket = "^0.20" +ws = { version = "^0.7", features = ["ssl"] } diff --git a/substrate/core/telemetry/src/lib.rs b/substrate/core/telemetry/src/lib.rs index 21e8685b36..64e51563e6 100644 --- a/substrate/core/telemetry/src/lib.rs +++ b/substrate/core/telemetry/src/lib.rs @@ -24,7 +24,7 @@ // end::description[] extern crate parking_lot; -extern crate websocket as ws; +extern crate ws; extern crate slog_async; extern crate slog_json; #[macro_use] @@ -33,7 +33,8 @@ extern crate log; extern crate slog; extern crate slog_scope; -use std::{io, time}; +use std::{io, time, thread}; +use std::sync::Arc; use parking_lot::Mutex; use slog::Drain; pub use slog_scope::with_logger; @@ -54,17 +55,9 @@ const CHANNEL_SIZE: usize = 262144; /// Initialise telemetry. pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard { - let client = ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok()); let log = slog::Logger::root( slog_async::Async::new( - slog_json::Json::default( - TelemetryWriter { - buffer: vec![], - out: Mutex::new(client), - config, - last_time: None, // ensures that on_connect will be called. - } - ).fuse() + slog_json::Json::default(TelemetryWriter::new(config)).fuse() ).chan_size(CHANNEL_SIZE) .overflow_strategy(slog_async::OverflowStrategy::DropAndReport) .build().fuse(), o!() @@ -78,81 +71,107 @@ macro_rules! telemetry { ( $($t:tt)* ) => { $crate::with_logger(|l| slog_info!(l, $($t)* )) } } -struct TelemetryWriter { - buffer: Vec, - out: Mutex>>>, - config: TelemetryConfig, - last_time: Option, +struct Connection<'a> { + out: ws::Sender, + out_sync: &'a Mutex>, + config: &'a TelemetryConfig, } -/// Every two minutes we reconnect to the telemetry server otherwise we don't get notified -/// of a flakey connection that has been dropped and needs to be reconnected. We can remove -/// this once we introduce a keepalive ping/pong. -const RECONNECT_PERIOD: u64 = 120; +impl<'a> Connection<'a> { + fn new(out: ws::Sender, out_sync: &'a Mutex>, config: &'a TelemetryConfig) -> Self { + Connection { + out, + out_sync, + config, + } + } +} + +impl<'a> ws::Handler for Connection<'a> { + fn on_open(&mut self, _: ws::Handshake) -> ws::Result<()> { + trace!(target: "telemetry", "Connected!"); + + *self.out_sync.lock() = Some(self.out.clone()); + (self.config.on_connect)(); + Ok(()) + } + + fn on_close(&mut self, code: ws::CloseCode, reason: &str) { + *self.out_sync.lock() = None; + + trace!(target: "telemetry", "Connection closing due to ({:?}) {}", code, reason); + } + + fn on_error(&mut self, _: ws::Error) { + *self.out_sync.lock() = None; + + // Sleep to ensure that reconnecting isn't spamming logs. + // This happens in it's own thread so it won't block anything. + thread::sleep(time::Duration::from_millis(1000)); + } +} + +struct TelemetryWriter { + buffer: Vec, + out: Arc>>, +} impl TelemetryWriter { - fn ensure_connected(&mut self) { - let mut client = self.out.lock(); + fn new(config: TelemetryConfig) -> Self { + let out_sync = Arc::new(Mutex::new(None)); + let out = out_sync.clone(); - let controlled_disconnect = if let Some(t) = self.last_time { - if t.elapsed().as_secs() > RECONNECT_PERIOD && client.is_some() { - trace!(target: "telemetry", "Performing controlled drop of the telemetry connection."); - let _ = client.as_mut().and_then(|socket| - socket.send_message(&ws::Message::text("{\"msg\":\"system.reconnect\"}")).ok() - ); - *client = None; - true - } else { - false - } - } else { - false - }; + thread::spawn(move || { + loop { + trace!(target: "telemetry", "Connecting to Telemetry..."); - let just_connected = if client.is_none() { - if !controlled_disconnect { - info!(target: "telemetry", "Connection dropped unexpectedly. Reconnecting to telemetry server..."); + let _ = ws::connect(config.url.as_str(), |out| Connection::new(out, &*out_sync, &config)); } - *client = ws::ClientBuilder::new(&self.config.url).ok().and_then(|mut x| x.connect(None).ok()); - client.is_some() - } else { - self.last_time.is_none() - }; + }); - drop(client); - if just_connected { - if !controlled_disconnect { - info!("Reconnected to telemetry server: {}", self.config.url); - } - self.last_time = Some(time::Instant::now()); - (self.config.on_connect)(); + TelemetryWriter { + buffer: Vec::new(), + out, } } } impl io::Write for TelemetryWriter { fn write(&mut self, msg: &[u8]) -> io::Result { - if msg.iter().any(|x| *x == b'\n') { + let mut iter = msg.split(|x| *x == b'\n'); + let first = iter.next().expect("Split iterator always has at least one element; qed"); + + self.buffer.extend_from_slice(first); + + // Flush for each occurrence of new line character + for continued in iter { let _ = self.flush(); - } else { - self.buffer.extend_from_slice(msg); + self.buffer.extend_from_slice(continued); } + Ok(msg.len()) } fn flush(&mut self) -> io::Result<()> { - self.ensure_connected(); + if self.buffer.is_empty() { + return Ok(()); + } + if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) { + let mut out = self.out.lock(); - let mut l = self.out.lock(); - let socket_closed = if let Some(ref mut socket) = *l { - if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) { - let r = socket.send_message(&ws::Message::text(s)); + let error = if let Some(ref mut o) = *out { + let r = o.send(s); trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r); + r.is_err() - } else { false } - } else { false }; - if socket_closed { - *l = None; + } else { + trace!(target: "telemetry", "Telemetry socket closed, failed to send: {}", s); + false + }; + + if error { + *out = None; + } } self.buffer.clear(); Ok(())