Remove messed up bridges subtree

This commit is contained in:
Hernando Castano
2021-04-21 11:55:51 -04:00
parent 142a0aec2f
commit 86a376cd69
288 changed files with 0 additions and 64158 deletions
-22
View File
@@ -1,22 +0,0 @@
[package]
name = "relay-utils"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
[dependencies]
ansi_term = "0.12"
async-std = "1.6.5"
async-trait = "0.1.40"
backoff = "0.2"
env_logger = "0.8.2"
futures = "0.3.5"
log = "0.4.11"
num-traits = "0.2"
sysinfo = "0.15"
time = "0.2"
# Substrate dependencies
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
@@ -1,59 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Relayer initialization functions.
use std::io::Write;
/// Initialize relay environment.
pub fn initialize_relay() {
let mut builder = env_logger::Builder::new();
let filters = match std::env::var("RUST_LOG") {
Ok(env_filters) => format!("bridge=info,{}", env_filters),
Err(_) => "bridge=info".into(),
};
builder.parse_filters(&filters);
builder.format(move |buf, record| {
writeln!(buf, "{}", {
let timestamp = time::OffsetDateTime::try_now_local()
.unwrap_or_else(|_| time::OffsetDateTime::now_utc())
.format("%Y-%m-%d %H:%M:%S %z");
if cfg!(windows) {
format!("{} {} {} {}", timestamp, record.level(), record.target(), record.args())
} else {
use ansi_term::Colour as Color;
let log_level = match record.level() {
log::Level::Error => Color::Fixed(9).bold().paint(record.level().to_string()),
log::Level::Warn => Color::Fixed(11).bold().paint(record.level().to_string()),
log::Level::Info => Color::Fixed(10).paint(record.level().to_string()),
log::Level::Debug => Color::Fixed(14).paint(record.level().to_string()),
log::Level::Trace => Color::Fixed(12).paint(record.level().to_string()),
};
format!(
"{} {} {} {}",
Color::Fixed(8).bold().paint(timestamp),
log_level,
Color::Fixed(8).paint(record.target()),
record.args()
)
}
})
});
builder.init();
}
-275
View File
@@ -1,275 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
//! Utilities used by different relays.
use backoff::{backoff::Backoff, ExponentialBackoff};
use futures::future::FutureExt;
use std::time::Duration;
/// Max delay after connection-unrelated error happened before we'll try the
/// same request again.
pub const MAX_BACKOFF_INTERVAL: Duration = Duration::from_secs(60);
/// Delay after connection-related error happened before we'll try
/// reconnection again.
pub const CONNECTION_ERROR_DELAY: Duration = Duration::from_secs(10);
pub mod initialize;
pub mod metrics;
pub mod relay_loop;
/// Block number traits shared by all chains that relay is able to serve.
pub trait BlockNumberBase:
'static
+ From<u32>
+ Into<u64>
+ Ord
+ Clone
+ Copy
+ Default
+ Send
+ Sync
+ std::fmt::Debug
+ std::fmt::Display
+ std::hash::Hash
+ std::ops::Add<Output = Self>
+ std::ops::Sub<Output = Self>
+ num_traits::CheckedSub
+ num_traits::Saturating
+ num_traits::Zero
+ num_traits::One
{
}
impl<T> BlockNumberBase for T where
T: 'static
+ From<u32>
+ Into<u64>
+ Ord
+ Clone
+ Copy
+ Default
+ Send
+ Sync
+ std::fmt::Debug
+ std::fmt::Display
+ std::hash::Hash
+ std::ops::Add<Output = Self>
+ std::ops::Sub<Output = Self>
+ num_traits::CheckedSub
+ num_traits::Saturating
+ num_traits::Zero
+ num_traits::One
{
}
/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
#[macro_export]
macro_rules! bail_on_error {
($result: expr) => {
match $result {
(client, Ok(result)) => (client, result),
(client, Err(error)) => return (client, Err(error)),
}
};
}
/// Macro that returns (client, Err(error)) tuple from function if result is Err(error).
#[macro_export]
macro_rules! bail_on_arg_error {
($result: expr, $client: ident) => {
match $result {
Ok(result) => result,
Err(error) => return ($client, Err(error)),
}
};
}
/// Ethereum header Id.
#[derive(Debug, Default, Clone, Copy, Eq, Hash, PartialEq)]
pub struct HeaderId<Hash, Number>(pub Number, pub Hash);
/// Error type that can signal connection errors.
pub trait MaybeConnectionError {
/// Returns true if error (maybe) represents connection error.
fn is_connection_error(&self) -> bool;
}
/// Stringified error that may be either connection-related or not.
#[derive(Debug)]
pub enum StringifiedMaybeConnectionError {
/// The error is connection-related error.
Connection(String),
/// The error is connection-unrelated error.
NonConnection(String),
}
impl StringifiedMaybeConnectionError {
/// Create new stringified connection error.
pub fn new(is_connection_error: bool, error: String) -> Self {
if is_connection_error {
StringifiedMaybeConnectionError::Connection(error)
} else {
StringifiedMaybeConnectionError::NonConnection(error)
}
}
}
impl MaybeConnectionError for StringifiedMaybeConnectionError {
fn is_connection_error(&self) -> bool {
match *self {
StringifiedMaybeConnectionError::Connection(_) => true,
StringifiedMaybeConnectionError::NonConnection(_) => false,
}
}
}
impl ToString for StringifiedMaybeConnectionError {
fn to_string(&self) -> String {
match *self {
StringifiedMaybeConnectionError::Connection(ref err) => err.clone(),
StringifiedMaybeConnectionError::NonConnection(ref err) => err.clone(),
}
}
}
/// Exponential backoff for connection-unrelated errors retries.
pub fn retry_backoff() -> ExponentialBackoff {
ExponentialBackoff {
// we do not want relayer to stop
max_elapsed_time: None,
max_interval: MAX_BACKOFF_INTERVAL,
..Default::default()
}
}
/// Compact format of IDs vector.
pub fn format_ids<Id: std::fmt::Debug>(mut ids: impl ExactSizeIterator<Item = Id>) -> String {
const NTH_PROOF: &str = "we have checked len; qed";
match ids.len() {
0 => "<nothing>".into(),
1 => format!("{:?}", ids.next().expect(NTH_PROOF)),
2 => {
let id0 = ids.next().expect(NTH_PROOF);
let id1 = ids.next().expect(NTH_PROOF);
format!("[{:?}, {:?}]", id0, id1)
}
len => {
let id0 = ids.next().expect(NTH_PROOF);
let id_last = ids.last().expect(NTH_PROOF);
format!("{}:[{:?} ... {:?}]", len, id0, id_last)
}
}
}
/// Stream that emits item every `timeout_ms` milliseconds.
pub fn interval(timeout: Duration) -> impl futures::Stream<Item = ()> {
futures::stream::unfold((), move |_| async move {
async_std::task::sleep(timeout).await;
Some(((), ()))
})
}
/// Which client has caused error.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum FailedClient {
/// It is the source client who has caused error.
Source,
/// It is the target client who has caused error.
Target,
/// Both clients are failing, or we just encountered some other error that
/// should be treated like that.
Both,
}
/// Future process result.
#[derive(Debug, Clone, Copy)]
pub enum ProcessFutureResult {
/// Future has been processed successfully.
Success,
/// Future has failed with non-connection error.
Failed,
/// Future has failed with connection error.
ConnectionFailed,
}
impl ProcessFutureResult {
/// Returns true if result is Success.
pub fn is_ok(self) -> bool {
match self {
ProcessFutureResult::Success => true,
ProcessFutureResult::Failed | ProcessFutureResult::ConnectionFailed => false,
}
}
/// Returns Ok(true) if future has succeeded.
/// Returns Ok(false) if future has failed with non-connection error.
/// Returns Err if future is `ConnectionFailed`.
pub fn fail_if_connection_error(self, failed_client: FailedClient) -> Result<bool, FailedClient> {
match self {
ProcessFutureResult::Success => Ok(true),
ProcessFutureResult::Failed => Ok(false),
ProcessFutureResult::ConnectionFailed => Err(failed_client),
}
}
}
/// Process result of the future from a client.
pub fn process_future_result<TResult, TError, TGoOfflineFuture>(
result: Result<TResult, TError>,
retry_backoff: &mut ExponentialBackoff,
on_success: impl FnOnce(TResult),
go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>,
go_offline: impl FnOnce(Duration) -> TGoOfflineFuture,
error_pattern: impl FnOnce() -> String,
) -> ProcessFutureResult
where
TError: std::fmt::Debug + MaybeConnectionError,
TGoOfflineFuture: FutureExt,
{
match result {
Ok(result) => {
on_success(result);
retry_backoff.reset();
ProcessFutureResult::Success
}
Err(error) if error.is_connection_error() => {
log::error!(
target: "bridge",
"{}: {:?}. Going to restart",
error_pattern(),
error,
);
retry_backoff.reset();
go_offline_future.set(go_offline(CONNECTION_ERROR_DELAY).fuse());
ProcessFutureResult::ConnectionFailed
}
Err(error) => {
let retry_delay = retry_backoff.next_backoff().unwrap_or(CONNECTION_ERROR_DELAY);
log::error!(
target: "bridge",
"{}: {:?}. Retrying in {}",
error_pattern(),
error,
retry_delay.as_secs_f64(),
);
go_offline_future.set(go_offline(retry_delay).fuse());
ProcessFutureResult::Failed
}
}
}
@@ -1,168 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
pub use substrate_prometheus_endpoint::{register, Counter, CounterVec, Gauge, GaugeVec, Opts, Registry, F64, U64};
use async_std::sync::{Arc, Mutex};
use std::net::SocketAddr;
use substrate_prometheus_endpoint::init_prometheus;
use sysinfo::{ProcessExt, RefreshKind, System, SystemExt};
/// Prometheus endpoint MetricsParams.
#[derive(Debug, Clone)]
pub struct MetricsParams {
/// Serve HTTP requests at given host.
pub host: String,
/// Serve HTTP requests at given port.
pub port: u16,
}
/// Metrics API.
pub trait Metrics {
/// Register metrics in the registry.
fn register(&self, registry: &Registry) -> Result<(), String>;
}
/// Global Prometheus metrics.
#[derive(Debug, Clone)]
pub struct GlobalMetrics {
system: Arc<Mutex<System>>,
system_average_load: GaugeVec<F64>,
process_cpu_usage_percentage: Gauge<F64>,
process_memory_usage_bytes: Gauge<U64>,
}
/// Start Prometheus endpoint with given metrics registry.
pub fn start(
prefix: String,
params: Option<MetricsParams>,
global_metrics: &GlobalMetrics,
extra_metrics: &impl Metrics,
) {
let params = match params {
Some(params) => params,
None => return,
};
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
let do_start = move || {
let prometheus_socket_addr = SocketAddr::new(
params
.host
.parse()
.map_err(|err| format!("Invalid Prometheus host {}: {}", params.host, err))?,
params.port,
);
let metrics_registry =
Registry::new_custom(Some(prefix), None).expect("only fails if prefix is empty; prefix is not empty; qed");
global_metrics.register(&metrics_registry)?;
extra_metrics.register(&metrics_registry)?;
async_std::task::spawn(async move {
init_prometheus(prometheus_socket_addr, metrics_registry)
.await
.map_err(|err| format!("Error starting Prometheus endpoint: {}", err))
});
Ok(())
};
let result: Result<(), String> = do_start();
if let Err(err) = result {
log::warn!(
target: "bridge",
"Failed to expose metrics: {}",
err,
);
}
}
impl Default for MetricsParams {
fn default() -> Self {
MetricsParams {
host: "127.0.0.1".into(),
port: 9616,
}
}
}
impl Metrics for GlobalMetrics {
fn register(&self, registry: &Registry) -> Result<(), String> {
register(self.system_average_load.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_cpu_usage_percentage.clone(), registry).map_err(|e| e.to_string())?;
register(self.process_memory_usage_bytes.clone(), registry).map_err(|e| e.to_string())?;
Ok(())
}
}
impl Default for GlobalMetrics {
fn default() -> Self {
GlobalMetrics {
system: Arc::new(Mutex::new(System::new_with_specifics(RefreshKind::everything()))),
system_average_load: GaugeVec::new(Opts::new("system_average_load", "System load average"), &["over"])
.expect("metric is static and thus valid; qed"),
process_cpu_usage_percentage: Gauge::new("process_cpu_usage_percentage", "Process CPU usage")
.expect("metric is static and thus valid; qed"),
process_memory_usage_bytes: Gauge::new(
"process_memory_usage_bytes",
"Process memory (resident set size) usage",
)
.expect("metric is static and thus valid; qed"),
}
}
}
impl GlobalMetrics {
/// Update metrics.
pub async fn update(&self) {
// update system-wide metrics
let mut system = self.system.lock().await;
let load = system.get_load_average();
self.system_average_load.with_label_values(&["1min"]).set(load.one);
self.system_average_load.with_label_values(&["5min"]).set(load.five);
self.system_average_load.with_label_values(&["15min"]).set(load.fifteen);
// update process-related metrics
let pid = sysinfo::get_current_pid().expect(
"only fails where pid is unavailable (os=unknown || arch=wasm32);\
relay is not supposed to run in such MetricsParamss;\
qed",
);
let is_process_refreshed = system.refresh_process(pid);
match (is_process_refreshed, system.get_process(pid)) {
(true, Some(process_info)) => {
let cpu_usage = process_info.cpu_usage() as f64;
let memory_usage = process_info.memory() * 1024;
log::trace!(
target: "bridge-metrics",
"Refreshed process metrics: CPU={}, memory={}",
cpu_usage,
memory_usage,
);
self.process_cpu_usage_percentage
.set(if cpu_usage.is_finite() { cpu_usage } else { 0f64 });
self.process_memory_usage_bytes.set(memory_usage);
}
_ => {
log::warn!(
target: "bridge",
"Failed to refresh process information. Metrics may show obsolete values",
);
}
}
}
}
@@ -1,95 +0,0 @@
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{FailedClient, MaybeConnectionError};
use async_trait::async_trait;
use std::{fmt::Debug, future::Future, time::Duration};
/// Default pause between reconnect attempts.
pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
/// Basic blockchain client from relay perspective.
#[async_trait]
pub trait Client: Clone + Send + Sync {
/// Type of error this clients returns.
type Error: Debug + MaybeConnectionError;
/// Try to reconnect to source node.
async fn reconnect(&mut self) -> Result<(), Self::Error>;
}
/// Run relay loop.
///
/// This function represents an outer loop, which in turn calls provided `loop_run` function to do
/// actual job. When `loop_run` returns, this outer loop reconnects to failed client (source,
/// target or both) and calls `loop_run` again.
pub fn run<SC: Client, TC: Client, R, F>(
reconnect_delay: Duration,
mut source_client: SC,
mut target_client: TC,
loop_run: R,
) where
R: Fn(SC, TC) -> F,
F: Future<Output = Result<(), FailedClient>>,
{
let mut local_pool = futures::executor::LocalPool::new();
local_pool.run_until(async move {
loop {
let result = loop_run(source_client.clone(), target_client.clone()).await;
match result {
Ok(()) => break,
Err(failed_client) => loop {
async_std::task::sleep(reconnect_delay).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
reconnect_delay.as_secs(),
error,
);
continue;
}
}
}
break;
},
}
log::debug!(target: "bridge", "Restarting relay loop");
}
});
}