mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-19 18:11:03 +00:00
Various small v0.2 improvements (#367)
* Make telemetry less susceptible to flakey wifi * Update readme * Staging shouldn't autoconnect to telemetry * Don't try to output more than 1KB of hex to Display * Better logging of transactions * Grumbles * off-by-one
This commit is contained in:
+1
-1
@@ -9,7 +9,7 @@ one. First, get Rust (1.26.1 or later) and the support software if you don't alr
|
|||||||
|
|
||||||
```
|
```
|
||||||
curl https://sh.rustup.rs -sSf | sh
|
curl https://sh.rustup.rs -sSf | sh
|
||||||
sudo apt install make clang
|
sudo apt install make clang pkg-config libssl-dev
|
||||||
```
|
```
|
||||||
|
|
||||||
Then, install Polkadot PoC-2:
|
Then, install Polkadot PoC-2:
|
||||||
|
|||||||
@@ -115,7 +115,7 @@ fn load_spec(matches: &clap::ArgMatches) -> Result<(service::ChainSpec, bool), S
|
|||||||
.map(ChainSpec::from)
|
.map(ChainSpec::from)
|
||||||
.unwrap_or_else(|| if matches.is_present("dev") { ChainSpec::Development } else { ChainSpec::KrummeLanke });
|
.unwrap_or_else(|| if matches.is_present("dev") { ChainSpec::Development } else { ChainSpec::KrummeLanke });
|
||||||
let is_global = match chain_spec {
|
let is_global = match chain_spec {
|
||||||
ChainSpec::KrummeLanke | ChainSpec::StagingTestnet => true,
|
ChainSpec::KrummeLanke => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
};
|
};
|
||||||
let spec = chain_spec.load()?;
|
let spec = chain_spec.load()?;
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ extern crate ed25519;
|
|||||||
extern crate substrate_client as client;
|
extern crate substrate_client as client;
|
||||||
extern crate substrate_codec as codec;
|
extern crate substrate_codec as codec;
|
||||||
extern crate substrate_extrinsic_pool as extrinsic_pool;
|
extern crate substrate_extrinsic_pool as extrinsic_pool;
|
||||||
extern crate substrate_primitives as substrate_primitives;
|
extern crate substrate_primitives;
|
||||||
extern crate substrate_runtime_primitives;
|
extern crate substrate_runtime_primitives;
|
||||||
extern crate polkadot_runtime as runtime;
|
extern crate polkadot_runtime as runtime;
|
||||||
extern crate polkadot_primitives as primitives;
|
extern crate polkadot_primitives as primitives;
|
||||||
@@ -279,13 +279,16 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where
|
|||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result<Self::VerifiedTransaction> {
|
fn verify_transaction(&self, uxt: UncheckedExtrinsic) -> Result<Self::VerifiedTransaction> {
|
||||||
info!("Extrinsic Submitted: {:?}", uxt);
|
|
||||||
|
|
||||||
if !uxt.is_signed() {
|
if !uxt.is_signed() {
|
||||||
bail!(ErrorKind::IsInherent(uxt))
|
bail!(ErrorKind::IsInherent(uxt))
|
||||||
}
|
}
|
||||||
|
|
||||||
let (encoded_size, hash) = uxt.using_encoded(|e| (e.len(), BlakeTwo256::hash(e)));
|
let encoded = uxt.encode();
|
||||||
|
let (encoded_size, hash) = (encoded.len(), BlakeTwo256::hash(&encoded));
|
||||||
|
|
||||||
|
debug!(target: "transaction-pool", "Transaction submitted: {}", ::substrate_primitives::hexdisplay::HexDisplay::from(&encoded));
|
||||||
|
|
||||||
let inner = match uxt.clone().check_with(|a| self.lookup(a)) {
|
let inner = match uxt.clone().check_with(|a| self.lookup(a)) {
|
||||||
Ok(xt) => Some(xt),
|
Ok(xt) => Some(xt),
|
||||||
// keep the transaction around in the future pool and attempt to promote it later.
|
// keep the transaction around in the future pool and attempt to promote it later.
|
||||||
@@ -294,6 +297,12 @@ impl<'a, A> txpool::Verifier<UncheckedExtrinsic> for Verifier<'a, A> where
|
|||||||
};
|
};
|
||||||
let sender = inner.as_ref().map(|x| x.signed.clone());
|
let sender = inner.as_ref().map(|x| x.signed.clone());
|
||||||
|
|
||||||
|
if encoded_size < 1024 {
|
||||||
|
info!(target: "transaction-pool", "Transaction verified: {} => {:?}", hash, uxt);
|
||||||
|
} else {
|
||||||
|
info!(target: "transaction-pool", "Transaction verified: {} ({} bytes is too large to display)", hash, encoded_size);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(VerifiedTransaction {
|
Ok(VerifiedTransaction {
|
||||||
original: uxt,
|
original: uxt,
|
||||||
inner,
|
inner,
|
||||||
|
|||||||
@@ -26,8 +26,18 @@ impl<'a> HexDisplay<'a> {
|
|||||||
|
|
||||||
impl<'a> ::core::fmt::Display for HexDisplay<'a> {
|
impl<'a> ::core::fmt::Display for HexDisplay<'a> {
|
||||||
fn fmt(&self, fmtr: &mut ::core::fmt::Formatter) -> Result<(), ::core::fmt::Error> {
|
fn fmt(&self, fmtr: &mut ::core::fmt::Formatter) -> Result<(), ::core::fmt::Error> {
|
||||||
for byte in self.0 {
|
if self.0.len() < 1027 {
|
||||||
try!( fmtr.write_fmt(format_args!("{:02x}", byte)));
|
for byte in self.0 {
|
||||||
|
fmtr.write_fmt(format_args!("{:02x}", byte))?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for byte in &self.0[0..512] {
|
||||||
|
fmtr.write_fmt(format_args!("{:02x}", byte))?;
|
||||||
|
}
|
||||||
|
fmtr.write_str("...")?;
|
||||||
|
for byte in &self.0[self.0.len() - 512..] {
|
||||||
|
fmtr.write_fmt(format_args!("{:02x}", byte))?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ extern crate log;
|
|||||||
extern crate slog;
|
extern crate slog;
|
||||||
extern crate slog_scope;
|
extern crate slog_scope;
|
||||||
|
|
||||||
use std::io;
|
use std::{io, time};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use slog::Drain;
|
use slog::Drain;
|
||||||
pub use slog_scope::with_logger;
|
pub use slog_scope::with_logger;
|
||||||
@@ -49,16 +49,15 @@ const CHANNEL_SIZE: usize = 262144;
|
|||||||
|
|
||||||
/// Initialise telemetry.
|
/// Initialise telemetry.
|
||||||
pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard {
|
pub fn init_telemetry(config: TelemetryConfig) -> slog_scope::GlobalLoggerGuard {
|
||||||
|
let client = ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok());
|
||||||
let log = slog::Logger::root(
|
let log = slog::Logger::root(
|
||||||
slog_async::Async::new(
|
slog_async::Async::new(
|
||||||
slog_json::Json::default(
|
slog_json::Json::default(
|
||||||
TelemetryWriter {
|
TelemetryWriter {
|
||||||
buffer: vec![],
|
buffer: vec![],
|
||||||
out: Mutex::new(
|
out: Mutex::new(client),
|
||||||
ws::ClientBuilder::new(&config.url).ok().and_then(|mut x| x.connect(None).ok())
|
|
||||||
),
|
|
||||||
config,
|
config,
|
||||||
first_time: true, // ensures that on_connect will be called.
|
last_time: None, // ensures that on_connect will be called.
|
||||||
}
|
}
|
||||||
).fuse()
|
).fuse()
|
||||||
).chan_size(CHANNEL_SIZE)
|
).chan_size(CHANNEL_SIZE)
|
||||||
@@ -78,20 +77,47 @@ struct TelemetryWriter {
|
|||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
out: Mutex<Option<ws::sync::Client<Box<ws::stream::sync::NetworkStream + Send>>>>,
|
out: Mutex<Option<ws::sync::Client<Box<ws::stream::sync::NetworkStream + Send>>>>,
|
||||||
config: TelemetryConfig,
|
config: TelemetryConfig,
|
||||||
first_time: bool,
|
last_time: Option<time::Instant>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Every two minutes we reconnect to the telemetry server otherwise we don't get notified
|
||||||
|
/// of a flakey connection that has been dropped and needs to be reconnected. We can remove
|
||||||
|
/// this once we introduce a keepalive ping/pong.
|
||||||
|
const RECONNECT_PERIOD: u64 = 120;
|
||||||
|
|
||||||
impl TelemetryWriter {
|
impl TelemetryWriter {
|
||||||
fn ensure_connected(&mut self) {
|
fn ensure_connected(&mut self) {
|
||||||
if self.first_time {
|
|
||||||
info!("Connected to telemetry server: {}", self.config.url);
|
|
||||||
(self.config.on_connect)();
|
|
||||||
self.first_time = false;
|
|
||||||
}
|
|
||||||
let mut client = self.out.lock();
|
let mut client = self.out.lock();
|
||||||
if client.is_none() {
|
|
||||||
|
let controlled_disconnect = if let Some(t) = self.last_time {
|
||||||
|
if t.elapsed().as_secs() > RECONNECT_PERIOD && client.is_some() {
|
||||||
|
trace!(target: "telemetry", "Performing controlled drop of the telemetry connection.");
|
||||||
|
let _ = client.as_mut().and_then(|socket|
|
||||||
|
socket.send_message(&ws::Message::text("{\"msg\":\"system.reconnect\"}")).ok()
|
||||||
|
);
|
||||||
|
*client = None;
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
|
let just_connected = if client.is_none() {
|
||||||
|
if !controlled_disconnect {
|
||||||
|
info!(target: "telemetry", "Connection dropped unexpectedly. Reconnecting to telemetry server...");
|
||||||
|
}
|
||||||
*client = ws::ClientBuilder::new(&self.config.url).ok().and_then(|mut x| x.connect(None).ok());
|
*client = ws::ClientBuilder::new(&self.config.url).ok().and_then(|mut x| x.connect(None).ok());
|
||||||
drop(client);
|
client.is_some()
|
||||||
|
} else {
|
||||||
|
self.last_time.is_none()
|
||||||
|
};
|
||||||
|
|
||||||
|
drop(client);
|
||||||
|
if just_connected && !controlled_disconnect {
|
||||||
|
self.last_time = Some(time::Instant::now());
|
||||||
|
info!("Reconnected to telemetry server: {}", self.config.url);
|
||||||
(self.config.on_connect)();
|
(self.config.on_connect)();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -113,7 +139,9 @@ impl io::Write for TelemetryWriter {
|
|||||||
let mut l = self.out.lock();
|
let mut l = self.out.lock();
|
||||||
let socket_closed = if let Some(ref mut socket) = *l {
|
let socket_closed = if let Some(ref mut socket) = *l {
|
||||||
if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) {
|
if let Ok(s) = ::std::str::from_utf8(&self.buffer[..]) {
|
||||||
socket.send_message(&ws::Message::text(s)).is_err()
|
let r = socket.send_message(&ws::Message::text(s));
|
||||||
|
trace!(target: "telemetry", "Sent to telemetry: {} -> {:?}", s, r);
|
||||||
|
r.is_err()
|
||||||
} else { false }
|
} else { false }
|
||||||
} else { false };
|
} else { false };
|
||||||
if socket_closed {
|
if socket_closed {
|
||||||
|
|||||||
Reference in New Issue
Block a user