From dd0eaa0d52c8ba1bd52a887d780f2bb3621b84e1 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Thu, 19 Jul 2018 12:08:34 +0200 Subject: [PATCH] Various small v0.2 improvements (#367) * Make telemetry less susceptible to flakey wifi * Update readme * Staging shouldn't autoconnect to telemetry * Don't try to output more than 1KB of hex to Display * Better logging of transactions * Grumbles * off-by-one --- substrate/README.md | 2 +- substrate/polkadot/cli/src/lib.rs | 2 +- .../polkadot/transaction-pool/src/lib.rs | 15 ++++- .../substrate/primitives/src/hexdisplay.rs | 14 ++++- substrate/substrate/telemetry/src/lib.rs | 56 ++++++++++++++----- 5 files changed, 68 insertions(+), 21 deletions(-) diff --git a/substrate/README.md b/substrate/README.md index 7b173147f1..79894c5a79 100644 --- a/substrate/README.md +++ b/substrate/README.md @@ -9,7 +9,7 @@ one. First, get Rust (1.26.1 or later) and the support software if you don't alr ``` curl https://sh.rustup.rs -sSf | sh -sudo apt install make clang +sudo apt install make clang pkg-config libssl-dev ``` Then, install Polkadot PoC-2: diff --git a/substrate/polkadot/cli/src/lib.rs b/substrate/polkadot/cli/src/lib.rs index 2552e36b42..1810922c7b 100644 --- a/substrate/polkadot/cli/src/lib.rs +++ b/substrate/polkadot/cli/src/lib.rs @@ -115,7 +115,7 @@ fn load_spec(matches: &clap::ArgMatches) -> Result<(service::ChainSpec, bool), S .map(ChainSpec::from) .unwrap_or_else(|| if matches.is_present("dev") { ChainSpec::Development } else { ChainSpec::KrummeLanke }); let is_global = match chain_spec { - ChainSpec::KrummeLanke | ChainSpec::StagingTestnet => true, + ChainSpec::KrummeLanke => true, _ => false, }; let spec = chain_spec.load()?; diff --git a/substrate/polkadot/transaction-pool/src/lib.rs b/substrate/polkadot/transaction-pool/src/lib.rs index 6d64d39e8d..265f72ba99 100644 --- a/substrate/polkadot/transaction-pool/src/lib.rs +++ b/substrate/polkadot/transaction-pool/src/lib.rs @@ -18,7 +18,7 @@ extern crate ed25519; extern crate substrate_client as client; extern crate substrate_codec as codec; extern crate substrate_extrinsic_pool as extrinsic_pool; -extern crate substrate_primitives as substrate_primitives; +extern crate substrate_primitives; extern crate substrate_runtime_primitives; extern crate polkadot_runtime as runtime; extern crate polkadot_primitives as primitives; @@ -279,13 +279,16 @@ impl<'a, A> txpool::Verifier for Verifier<'a, A> where type Error = Error; fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result { - info!("Extrinsic Submitted: {:?}", uxt); if !uxt.is_signed() { bail!(ErrorKind::IsInherent(uxt)) } - let (encoded_size, hash) = uxt.using_encoded(|e| (e.len(), BlakeTwo256::hash(e))); + let encoded = uxt.encode(); + let (encoded_size, hash) = (encoded.len(), BlakeTwo256::hash(&encoded)); + + debug!(target: "transaction-pool", "Transaction submitted: {}", ::substrate_primitives::hexdisplay::HexDisplay::from(&encoded)); + let inner = match uxt.clone().check_with(|a| self.lookup(a)) { Ok(xt) => Some(xt), // keep the transaction around in the future pool and attempt to promote it later. @@ -294,6 +297,12 @@ impl<'a, A> txpool::Verifier for Verifier<'a, A> where }; let sender = inner.as_ref().map(|x| x.signed.clone()); + if encoded_size < 1024 { + info!(target: "transaction-pool", "Transaction verified: {} => {:?}", hash, uxt); + } else { + info!(target: "transaction-pool", "Transaction verified: {} ({} bytes is too large to display)", hash, encoded_size); + } + Ok(VerifiedTransaction { original: uxt, inner, diff --git a/substrate/substrate/primitives/src/hexdisplay.rs b/substrate/substrate/primitives/src/hexdisplay.rs index dbb90d4738..b1746727ae 100644 --- a/substrate/substrate/primitives/src/hexdisplay.rs +++ b/substrate/substrate/primitives/src/hexdisplay.rs @@ -26,8 +26,18 @@ impl<'a> HexDisplay<'a> { impl<'a> ::core::fmt::Display for HexDisplay<'a> { fn fmt(&self, fmtr: &mut ::core::fmt::Formatter) -> Result<(), ::core::fmt::Error> { - for byte in self.0 { - try!( fmtr.write_fmt(format_args!("{:02x}", byte))); + if self.0.len() < 1027 { + for byte in self.0 { + fmtr.write_fmt(format_args!("{:02x}", byte))?; + } + } else { + for byte in &self.0[0..512] { + fmtr.write_fmt(format_args!("{:02x}", byte))?; + } + fmtr.write_str("...")?; + for byte in &self.0[self.0.len() - 512..] { + fmtr.write_fmt(format_args!("{:02x}", byte))?; + } } Ok(()) } diff --git a/substrate/substrate/telemetry/src/lib.rs b/substrate/substrate/telemetry/src/lib.rs index e53cd10e89..dd5a30936d 100644 --- a/substrate/substrate/telemetry/src/lib.rs +++ b/substrate/substrate/telemetry/src/lib.rs @@ -31,7 +31,7 @@ extern crate log; extern crate slog; extern crate slog_scope; -use std::io; +use std::{io, time}; use parking_lot::Mutex; use slog::Drain; pub use slog_scope::with_logger; @@ -49,16 +49,15 @@ const CHANNEL_SIZE: usize = 262144; /// Initialise telemetry. pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard { + let client = ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok()); let log = slog::Logger::root( slog_async::Async::new( slog_json::Json::default( TelemetryWriter { buffer: vec![], - out: Mutex::new( - ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok()) - ), + out: Mutex::new(client), config, - first_time: true, // ensures that on_connect will be called. + last_time: None, // ensures that on_connect will be called. } ).fuse() ).chan_size(CHANNEL_SIZE) @@ -78,20 +77,47 @@ struct TelemetryWriter { buffer: Vec, out: Mutex>>>, config: TelemetryConfig, - first_time: bool, + last_time: Option, } +/// Every two minutes we reconnect to the telemetry server otherwise we don't get notified +/// of a flakey connection that has been dropped and needs to be reconnected. We can remove +/// this once we introduce a keepalive ping/pong. +const RECONNECT_PERIOD: u64 = 120; + impl TelemetryWriter { fn ensure_connected(&mut self) { - if self.first_time { - info!("Connected to telemetry server: {}", self.config.url); - (self.config.on_connect)(); - self.first_time = false; - } let mut client = self.out.lock(); - if client.is_none() { + + let controlled_disconnect = if let Some(t) = self.last_time { + if t.elapsed().as_secs() > RECONNECT_PERIOD && client.is_some() { + trace!(target: "telemetry", "Performing controlled drop of the telemetry connection."); + let _ = client.as_mut().and_then(|socket| + socket.send_message(&ws::Message::text("{\"msg\":\"system.reconnect\"}")).ok() + ); + *client = None; + true + } else { + false + } + } else { + false + }; + + let just_connected = if client.is_none() { + if !controlled_disconnect { + info!(target: "telemetry", "Connection dropped unexpectedly. Reconnecting to telemetry server..."); + } *client = ws::ClientBuilder::new(&self.config.url).ok().and_then(|mut x| x.connect(None).ok()); - drop(client); + client.is_some() + } else { + self.last_time.is_none() + }; + + drop(client); + if just_connected && !controlled_disconnect { + self.last_time = Some(time::Instant::now()); + info!("Reconnected to telemetry server: {}", self.config.url); (self.config.on_connect)(); } } @@ -113,7 +139,9 @@ impl io::Write for TelemetryWriter { let mut l = self.out.lock(); let socket_closed = if let Some(ref mut socket) = *l { if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) { - socket.send_message(&ws::Message::text(s)).is_err() + let r = socket.send_message(&ws::Message::text(s)); + trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r); + r.is_err() } else { false } } else { false }; if socket_closed {