mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-08 11:18:01 +00:00
TelemetryEndpoints must be valid MutliAddr URL (#5069)
* Check for url validity when creating TelemetryEndpoints * Update code that used TelemetryEndpoints::new() * Update commennts that referred to TelemetryEndpoints::new() * Add tests for telemetry * Fix typo and fix code in docs * Return error on failing to override telemetry * Use expect instead of suppressing errors on must-be-valid telemetry endpoints * Update telemetry unit tests to use expect instead of unwrap * Implement custom deserializer for TelemetryEndpoints * Fix typo * Apply suggestions from code review Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -159,7 +159,8 @@ pub fn staging_testnet_config() -> ChainSpec {
|
||||
"staging_testnet",
|
||||
staging_testnet_config_genesis,
|
||||
boot_nodes,
|
||||
Some(TelemetryEndpoints::new(vec![(STAGING_TELEMETRY_URL.to_string(), 0)])),
|
||||
Some(TelemetryEndpoints::new(vec![(STAGING_TELEMETRY_URL.to_string(), 0)])
|
||||
.expect("Staging telemetry url is valid; qed")),
|
||||
None,
|
||||
None,
|
||||
Default::default(),
|
||||
|
||||
@@ -419,7 +419,7 @@ impl RunCmd {
|
||||
config.telemetry_endpoints = None;
|
||||
} else if !self.telemetry_endpoints.is_empty() {
|
||||
config.telemetry_endpoints = Some(
|
||||
TelemetryEndpoints::new(self.telemetry_endpoints.clone())
|
||||
TelemetryEndpoints::new(self.telemetry_endpoints.clone()).map_err(|e| e.to_string())?
|
||||
);
|
||||
}
|
||||
|
||||
@@ -689,7 +689,8 @@ mod tests {
|
||||
"test-id",
|
||||
|| (),
|
||||
vec!["boo".to_string()],
|
||||
Some(TelemetryEndpoints::new(vec![("foo".to_string(), 42)])),
|
||||
Some(TelemetryEndpoints::new(vec![("wss://foo/bar".to_string(), 42)])
|
||||
.expect("provided url should be valid")),
|
||||
None,
|
||||
None,
|
||||
None::<()>,
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
//! `slog_scope::with_logger` followed with `slog_log!`.
|
||||
//!
|
||||
//! Note that you are supposed to only ever use `telemetry!` and not `slog_scope::with_logger` at
|
||||
//! the moment. Substate may eventually be reworked to get proper `slog` support, including sending
|
||||
//! the moment. Substrate may eventually be reworked to get proper `slog` support, including sending
|
||||
//! information to the telemetry.
|
||||
//!
|
||||
//! The [`Telemetry`] struct implements `Stream` and must be polled regularly (or sent to a
|
||||
@@ -41,7 +41,7 @@
|
||||
//! endpoints: sc_telemetry::TelemetryEndpoints::new(vec![
|
||||
//! // The `0` is the maximum verbosity level of messages to send to this endpoint.
|
||||
//! ("wss://example.com".into(), 0)
|
||||
//! ]),
|
||||
//! ]).expect("Invalid URL or multiaddr provided"),
|
||||
//! // Can be used to pass an external implementation of WebSockets.
|
||||
//! wasm_external_transport: None,
|
||||
//! });
|
||||
@@ -62,7 +62,7 @@ use futures::{prelude::*, channel::mpsc};
|
||||
use libp2p::{Multiaddr, wasm_ext};
|
||||
use log::{error, warn};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use serde::{Serialize, Deserialize, Deserializer};
|
||||
use std::{pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration};
|
||||
use wasm_timer::Instant;
|
||||
|
||||
@@ -96,14 +96,49 @@ pub struct TelemetryConfig {
|
||||
///
|
||||
/// The URL string can be either a URL or a multiaddress.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct TelemetryEndpoints(Vec<(String, u8)>);
|
||||
pub struct TelemetryEndpoints(
|
||||
#[serde(deserialize_with = "url_or_multiaddr_deser")]
|
||||
Vec<(Multiaddr, u8)>
|
||||
);
|
||||
|
||||
/// Custom deserializer for TelemetryEndpoints, used to convert urls or multiaddr to multiaddr.
|
||||
fn url_or_multiaddr_deser<'de, D>(deserializer: D) -> Result<Vec<(Multiaddr, u8)>, D::Error>
|
||||
where D: Deserializer<'de>
|
||||
{
|
||||
Vec::<(String, u8)>::deserialize(deserializer)?
|
||||
.iter()
|
||||
.map(|e| Ok((url_to_multiaddr(&e.0)
|
||||
.map_err(serde::de::Error::custom)?, e.1)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
impl TelemetryEndpoints {
|
||||
pub fn new(endpoints: Vec<(String, u8)>) -> Self {
|
||||
TelemetryEndpoints(endpoints)
|
||||
pub fn new(endpoints: Vec<(String, u8)>) -> Result<Self, libp2p::multiaddr::Error> {
|
||||
let endpoints: Result<Vec<(Multiaddr, u8)>, libp2p::multiaddr::Error> = endpoints.iter()
|
||||
.map(|e| Ok((url_to_multiaddr(&e.0)?, e.1)))
|
||||
.collect();
|
||||
endpoints.map(Self)
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a WebSocket URL into a libp2p `Multiaddr`.
|
||||
fn url_to_multiaddr(url: &str) -> Result<Multiaddr, libp2p::multiaddr::Error> {
|
||||
// First, assume that we have a `Multiaddr`.
|
||||
let parse_error = match url.parse() {
|
||||
Ok(ma) => return Ok(ma),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
// If not, try the `ws://path/url` format.
|
||||
if let Ok(ma) = libp2p::multiaddr::from_url(url) {
|
||||
return Ok(ma)
|
||||
}
|
||||
|
||||
// If we have no clue about the format of that string, assume that we were expecting a
|
||||
// `Multiaddr`.
|
||||
Err(parse_error)
|
||||
}
|
||||
|
||||
/// Log levels.
|
||||
pub const SUBSTRATE_DEBUG: &str = "9";
|
||||
pub const SUBSTRATE_INFO: &str = "0";
|
||||
@@ -149,13 +184,7 @@ struct TelemetryDrain {
|
||||
/// doesn't provide any way of knowing whether a global logger has already been registered.
|
||||
pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
|
||||
// Build the list of telemetry endpoints.
|
||||
let mut endpoints = Vec::new();
|
||||
for &(ref url, verbosity) in &config.endpoints.0 {
|
||||
match url_to_multiaddr(url) {
|
||||
Ok(addr) => endpoints.push((addr, verbosity)),
|
||||
Err(err) => warn!(target: "telemetry", "Invalid telemetry URL {}: {}", url, err),
|
||||
}
|
||||
}
|
||||
let (endpoints, wasm_external_transport) = (config.endpoints.0, config.wasm_external_transport);
|
||||
|
||||
let (sender, receiver) = mpsc::channel(16);
|
||||
let guard = {
|
||||
@@ -164,7 +193,7 @@ pub fn init_telemetry(config: TelemetryConfig) -> Telemetry {
|
||||
slog_scope::set_global_logger(root)
|
||||
};
|
||||
|
||||
let worker = match worker::TelemetryWorker::new(endpoints, config.wasm_external_transport) {
|
||||
let worker = match worker::TelemetryWorker::new(endpoints, wasm_external_transport) {
|
||||
Ok(w) => Some(w),
|
||||
Err(err) => {
|
||||
error!(target: "telemetry", "Failed to initialize telemetry worker: {:?}", err);
|
||||
@@ -271,24 +300,6 @@ impl slog::Drain for TelemetryDrain {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses a WebSocket URL into a libp2p `Multiaddr`.
|
||||
fn url_to_multiaddr(url: &str) -> Result<Multiaddr, libp2p::multiaddr::Error> {
|
||||
// First, assume that we have a `Multiaddr`.
|
||||
let parse_error = match url.parse() {
|
||||
Ok(ma) => return Ok(ma),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
// If not, try the `ws://path/url` format.
|
||||
if let Ok(ma) = libp2p::multiaddr::from_url(url) {
|
||||
return Ok(ma)
|
||||
}
|
||||
|
||||
// If we have no clue about the format of that string, assume that we were expecting a
|
||||
// `Multiaddr`.
|
||||
Err(parse_error)
|
||||
}
|
||||
|
||||
/// Translates to `slog_scope::info`, but contains an additional verbosity
|
||||
/// parameter which the log record is tagged with. Additionally the verbosity
|
||||
/// parameter is added to the record as a key-value pair.
|
||||
@@ -300,3 +311,35 @@ macro_rules! telemetry {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod telemetry_endpoints_tests {
|
||||
use libp2p::Multiaddr;
|
||||
use super::TelemetryEndpoints;
|
||||
use super::url_to_multiaddr;
|
||||
|
||||
#[test]
|
||||
fn valid_endpoints() {
|
||||
let endp = vec![("wss://telemetry.polkadot.io/submit/".into(), 3), ("/ip4/80.123.90.4/tcp/5432".into(), 4)];
|
||||
let telem = TelemetryEndpoints::new(endp.clone()).expect("Telemetry endpoint should be valid");
|
||||
let mut res: Vec<(Multiaddr, u8)> = vec![];
|
||||
for (a, b) in endp.iter() {
|
||||
res.push((url_to_multiaddr(a).expect("provided url should be valid"), *b))
|
||||
}
|
||||
assert_eq!(telem.0, res);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_endpoints() {
|
||||
let endp = vec![("/ip4/...80.123.90.4/tcp/5432".into(), 3), ("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4)];
|
||||
let telem = TelemetryEndpoints::new(endp);
|
||||
assert!(telem.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn valid_and_invalid_endpoints() {
|
||||
let endp = vec![("/ip4/80.123.90.4/tcp/5432".into(), 3), ("/ip4/no:!?;rlkqre;;::::///tcp/5432".into(), 4)];
|
||||
let telem = TelemetryEndpoints::new(endp);
|
||||
assert!(telem.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user