Prefix in relay loops logs (#920)

* prefix in relay loops logs

* fmt
This commit is contained in:
Svyatoslav Nikolsky
2021-04-22 14:44:39 +03:00
committed by Bastian Köcher
parent 25a4151a8b
commit 63ce1b5973
14 changed files with 137 additions and 89 deletions
@@ -96,7 +96,7 @@ where
SourceChain: Clone + Chain + Debug, SourceChain: Clone + Chain + Debug,
BlockNumberOf<SourceChain>: BlockNumberBase, BlockNumberOf<SourceChain>: BlockNumberBase,
TargetChain: Clone + Chain + Debug, TargetChain: Clone + Chain + Debug,
TargetSign: Clone + Send + Sync, TargetSign: 'static + Clone + Send + Sync,
{ {
const SOURCE_NAME: &'static str = SourceChain::NAME; const SOURCE_NAME: &'static str = SourceChain::NAME;
const TARGET_NAME: &'static str = TargetChain::NAME; const TARGET_NAME: &'static str = TargetChain::NAME;
@@ -93,7 +93,7 @@ impl<C, P, R, I> RelayClient for SubstrateMessagesSource<C, P, R, I>
where where
C: Chain, C: Chain,
P: SubstrateMessageLane, P: SubstrateMessageLane,
R: Send + Sync, R: 'static + Send + Sync,
I: Send + Sync + Instance, I: Send + Sync + Instance,
{ {
type Error = SubstrateError; type Error = SubstrateError;
@@ -93,7 +93,7 @@ impl<C, P, R, I> RelayClient for SubstrateMessagesTarget<C, P, R, I>
where where
C: Chain, C: Chain,
P: SubstrateMessageLane, P: SubstrateMessageLane,
R: Send + Sync, R: 'static + Send + Sync,
I: Send + Sync + Instance, I: Send + Sync + Instance,
{ {
type Error = SubstrateError; type Error = SubstrateError;
+9 -6
View File
@@ -26,7 +26,7 @@ use std::{
}; };
/// Transaction proof pipeline. /// Transaction proof pipeline.
pub trait TransactionProofPipeline { pub trait TransactionProofPipeline: 'static {
/// Name of the transaction proof source. /// Name of the transaction proof source.
const SOURCE_NAME: &'static str; const SOURCE_NAME: &'static str;
/// Name of the transaction proof target. /// Name of the transaction proof target.
@@ -35,18 +35,21 @@ pub trait TransactionProofPipeline {
/// Block type. /// Block type.
type Block: SourceBlock; type Block: SourceBlock;
/// Transaction inclusion proof type. /// Transaction inclusion proof type.
type TransactionProof; type TransactionProof: 'static + Send + Sync;
} }
/// Block that is participating in exchange. /// Block that is participating in exchange.
pub trait SourceBlock { pub trait SourceBlock: 'static + Send + Sync {
/// Block hash type. /// Block hash type.
type Hash: Clone + Debug + Display; type Hash: 'static + Clone + Send + Sync + Debug + Display;
/// Block number type. /// Block number type.
type Number: Debug type Number: 'static
+ Debug
+ Display + Display
+ Clone + Clone
+ Copy + Copy
+ Send
+ Sync
+ Into<u64> + Into<u64>
+ std::cmp::Ord + std::cmp::Ord
+ std::ops::Add<Output = Self::Number> + std::ops::Add<Output = Self::Number>
@@ -61,7 +64,7 @@ pub trait SourceBlock {
} }
/// Transaction that is participating in exchange. /// Transaction that is participating in exchange.
pub trait SourceTransaction { pub trait SourceTransaction: 'static + Send {
/// Transaction hash type. /// Transaction hash type.
type Hash: Debug + Display; type Hash: Debug + Display;
+6 -6
View File
@@ -39,9 +39,9 @@ pub struct TransactionProofsRelayState<BlockNumber> {
} }
/// Transactions proofs relay storage. /// Transactions proofs relay storage.
pub trait TransactionProofsRelayStorage: Clone { pub trait TransactionProofsRelayStorage: 'static + Clone + Send + Sync {
/// Associated block number. /// Associated block number.
type BlockNumber; type BlockNumber: 'static + Send + Sync;
/// Get relay state. /// Get relay state.
fn state(&self) -> TransactionProofsRelayState<Self::BlockNumber>; fn state(&self) -> TransactionProofsRelayState<Self::BlockNumber>;
@@ -64,7 +64,7 @@ impl<BlockNumber> InMemoryStorage<BlockNumber> {
} }
} }
impl<BlockNumber: Clone + Copy> TransactionProofsRelayStorage for InMemoryStorage<BlockNumber> { impl<BlockNumber: 'static + Clone + Copy + Send + Sync> TransactionProofsRelayStorage for InMemoryStorage<BlockNumber> {
type BlockNumber = BlockNumber; type BlockNumber = BlockNumber;
fn state(&self) -> TransactionProofsRelayState<BlockNumber> { fn state(&self) -> TransactionProofsRelayState<BlockNumber> {
@@ -89,7 +89,7 @@ pub async fn run<P: TransactionProofPipeline>(
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>, target_client: impl TargetClient<P>,
metrics_params: MetricsParams, metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> { ) -> Result<(), String> {
let exit_signal = exit_signal.shared(); let exit_signal = exit_signal.shared();
@@ -99,7 +99,7 @@ pub async fn run<P: TransactionProofPipeline>(
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose() .expose()
.await? .await?
.run(|source_client, target_client, metrics| { .run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost( run_until_connection_lost(
storage.clone(), storage.clone(),
source_client, source_client,
@@ -117,7 +117,7 @@ async fn run_until_connection_lost<P: TransactionProofPipeline>(
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>, target_client: impl TargetClient<P>,
metrics_exch: Option<ExchangeLoopMetrics>, metrics_exch: Option<ExchangeLoopMetrics>,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient> { ) -> Result<(), FailedClient> {
let mut retry_backoff = retry_backoff(); let mut retry_backoff = retry_backoff();
let mut state = storage.state(); let mut state = storage.state();
+3 -3
View File
@@ -65,7 +65,7 @@ pub struct FinalitySyncParams {
pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient { pub trait SourceClient<P: FinalitySyncPipeline>: RelayClient {
/// Stream of new finality proofs. The stream is allowed to miss proofs for some /// Stream of new finality proofs. The stream is allowed to miss proofs for some
/// headers, even if those headers are mandatory. /// headers, even if those headers are mandatory.
type FinalityProofsStream: Stream<Item = P::FinalityProof>; type FinalityProofsStream: Stream<Item = P::FinalityProof> + Send;
/// Get best finalized block number. /// Get best finalized block number.
async fn best_finalized_block_number(&self) -> Result<P::Number, Self::Error>; async fn best_finalized_block_number(&self) -> Result<P::Number, Self::Error>;
@@ -101,7 +101,7 @@ pub async fn run<P: FinalitySyncPipeline>(
target_client: impl TargetClient<P>, target_client: impl TargetClient<P>,
sync_params: FinalitySyncParams, sync_params: FinalitySyncParams,
metrics_params: MetricsParams, metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> { ) -> Result<(), String> {
let exit_signal = exit_signal.shared(); let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client) relay_utils::relay_loop(source_client, target_client)
@@ -110,7 +110,7 @@ pub async fn run<P: FinalitySyncPipeline>(
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose() .expose()
.await? .await?
.run(|source_client, target_client, metrics| { .run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost( run_until_connection_lost(
source_client, source_client,
target_client, target_client,
@@ -106,7 +106,7 @@ impl RelayClient for TestSourceClient {
#[async_trait] #[async_trait]
impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient { impl SourceClient<TestFinalitySyncPipeline> for TestSourceClient {
type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof>>>; type FinalityProofsStream = Pin<Box<dyn Stream<Item = TestFinalityProof> + 'static + Send>>;
async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> { async fn best_finalized_block_number(&self) -> Result<TestNumber, TestError> {
let mut data = self.data.lock(); let mut data = self.data.lock();
+1 -1
View File
@@ -28,7 +28,7 @@ mod finality_loop;
mod finality_loop_tests; mod finality_loop_tests;
/// Finality proofs synchronization pipeline. /// Finality proofs synchronization pipeline.
pub trait FinalitySyncPipeline: Clone + Debug + Send + Sync { pub trait FinalitySyncPipeline: 'static + Clone + Debug + Send + Sync {
/// Name of the finality proofs source. /// Name of the finality proofs source.
const SOURCE_NAME: &'static str; const SOURCE_NAME: &'static str;
/// Name of the finality proofs target. /// Name of the finality proofs target.
+4 -4
View File
@@ -102,7 +102,7 @@ pub trait TargetClient<P: HeadersSyncPipeline>: RelayClient {
/// Synchronization maintain procedure. /// Synchronization maintain procedure.
#[async_trait] #[async_trait]
pub trait SyncMaintain<P: HeadersSyncPipeline>: Clone + Send + Sync { pub trait SyncMaintain<P: HeadersSyncPipeline>: 'static + Clone + Send + Sync {
/// Run custom maintain procedures. This is guaranteed to be called when both source and target /// Run custom maintain procedures. This is guaranteed to be called when both source and target
/// clients are unoccupied. /// clients are unoccupied.
async fn maintain(&self, _sync: &mut HeadersSync<P>) {} async fn maintain(&self, _sync: &mut HeadersSync<P>) {}
@@ -125,7 +125,7 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
sync_maintain: impl SyncMaintain<P>, sync_maintain: impl SyncMaintain<P>,
sync_params: HeadersSyncParams, sync_params: HeadersSyncParams,
metrics_params: MetricsParams, metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), String> { ) -> Result<(), String> {
let exit_signal = exit_signal.shared(); let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client) relay_utils::relay_loop(source_client, target_client)
@@ -134,7 +134,7 @@ pub async fn run<P: HeadersSyncPipeline, TC: TargetClient<P>>(
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose() .expose()
.await? .await?
.run(|source_client, target_client, metrics| { .run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost( run_until_connection_lost(
source_client, source_client,
source_tick, source_tick,
@@ -159,7 +159,7 @@ async fn run_until_connection_lost<P: HeadersSyncPipeline, TC: TargetClient<P>>(
sync_maintain: impl SyncMaintain<P>, sync_maintain: impl SyncMaintain<P>,
sync_params: HeadersSyncParams, sync_params: HeadersSyncParams,
metrics_sync: Option<SyncLoopMetrics>, metrics_sync: Option<SyncLoopMetrics>,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient> { ) -> Result<(), FailedClient> {
let mut progress_context = (Instant::now(), None, None); let mut progress_context = (Instant::now(), None, None);
+1 -1
View File
@@ -43,7 +43,7 @@ pub enum HeaderStatus {
} }
/// Headers synchronization pipeline. /// Headers synchronization pipeline.
pub trait HeadersSyncPipeline: Clone + Send + Sync { pub trait HeadersSyncPipeline: 'static + Clone + Send + Sync {
/// Name of the headers source. /// Name of the headers source.
const SOURCE_NAME: &'static str; const SOURCE_NAME: &'static str;
/// Name of the headers target. /// Name of the headers target.
+1 -1
View File
@@ -23,7 +23,7 @@ use relay_utils::{BlockNumberBase, HeaderId};
use std::fmt::Debug; use std::fmt::Debug;
/// One-way message lane. /// One-way message lane.
pub trait MessageLane: Clone + Send + Sync { pub trait MessageLane: 'static + Clone + Send + Sync {
/// Name of the messages source. /// Name of the messages source.
const SOURCE_NAME: &'static str; const SOURCE_NAME: &'static str;
/// Name of the messages target. /// Name of the messages target.
@@ -227,7 +227,7 @@ pub async fn run<P: MessageLane>(
source_client: impl SourceClient<P>, source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>, target_client: impl TargetClient<P>,
metrics_params: MetricsParams, metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()> + Send + 'static,
) -> Result<(), String> { ) -> Result<(), String> {
let exit_signal = exit_signal.shared(); let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client) relay_utils::relay_loop(source_client, target_client)
@@ -237,7 +237,9 @@ pub async fn run<P: MessageLane>(
.standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))? .standalone_metric(|registry, prefix| GlobalMetrics::new(registry, prefix))?
.expose() .expose()
.await? .await?
.run(|source_client, target_client, metrics| { .run(
metrics_prefix::<P>(&params.lane),
move |source_client, target_client, metrics| {
run_until_connection_lost( run_until_connection_lost(
params.clone(), params.clone(),
source_client, source_client,
@@ -245,7 +247,8 @@ pub async fn run<P: MessageLane>(
metrics, metrics,
exit_signal.clone(), exit_signal.clone(),
) )
}) },
)
.await .await
} }
@@ -701,7 +704,7 @@ pub(crate) mod tests {
data: TestClientData, data: TestClientData,
source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>, source_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>, target_tick: Arc<dyn Fn(&mut TestClientData) + Send + Sync>,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()> + 'static + Send,
) -> TestClientData { ) -> TestClientData {
async_std::task::block_on(async { async_std::task::block_on(async {
let data = Arc::new(Mutex::new(data)); let data = Arc::new(Mutex::new(data));
+43 -3
View File
@@ -16,7 +16,11 @@
//! Relayer initialization functions. //! Relayer initialization functions.
use std::{fmt::Display, io::Write}; use std::{cell::RefCell, fmt::Display, io::Write};
async_std::task_local! {
pub(crate) static LOOP_NAME: RefCell<String> = RefCell::new(String::default());
}
/// Initialize relay environment. /// Initialize relay environment.
pub fn initialize_relay() { pub fn initialize_relay() {
@@ -43,20 +47,56 @@ pub fn initialize_logger(with_timestamp: bool) {
Either::Right(ansi_term::Colour::Fixed(8).bold().paint(timestamp)) Either::Right(ansi_term::Colour::Fixed(8).bold().paint(timestamp))
}; };
writeln!(buf, "{} {} {} {}", timestamp, log_level, log_target, record.args(),) writeln!(
buf,
"{}{} {} {} {}",
loop_name_prefix(),
timestamp,
log_level,
log_target,
record.args(),
)
}); });
} else { } else {
builder.format(move |buf, record| { builder.format(move |buf, record| {
let log_level = color_level(record.level()); let log_level = color_level(record.level());
let log_target = color_target(record.target()); let log_target = color_target(record.target());
writeln!(buf, "{} {} {}", log_level, log_target, record.args(),) writeln!(
buf,
"{}{} {} {}",
loop_name_prefix(),
log_level,
log_target,
record.args(),
)
}); });
} }
builder.init(); builder.init();
} }
/// Initialize relay loop. Must only be called once per every loop task.
pub(crate) fn initialize_loop(loop_name: String) {
LOOP_NAME.with(|g_loop_name| *g_loop_name.borrow_mut() = loop_name);
}
/// Returns loop name prefix to use in logs. The prefix is initialized with the `initialize_loop` call.
fn loop_name_prefix() -> String {
// try_with to avoid panic outside of async-std task context
LOOP_NAME
.try_with(|loop_name| {
// using borrow is ok here, because loop is only initialized once (=> borrow_mut will only be called once)
let loop_name = loop_name.borrow();
if loop_name.is_empty() {
String::new()
} else {
format!("[{}] ", loop_name)
}
})
.unwrap_or_else(|_| String::new())
}
enum Either<A, B> { enum Either<A, B> {
Left(A), Left(A),
Right(B), Right(B),
+16 -14
View File
@@ -26,9 +26,9 @@ pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
/// Basic blockchain client from relay perspective. /// Basic blockchain client from relay perspective.
#[async_trait] #[async_trait]
pub trait Client: Clone + Send + Sync { pub trait Client: 'static + Clone + Send + Sync {
/// Type of error this clients returns. /// Type of error this clients returns.
type Error: Debug + MaybeConnectionError; type Error: 'static + Debug + MaybeConnectionError + Send + Sync;
/// Try to reconnect to source node. /// Try to reconnect to source node.
async fn reconnect(&mut self) -> Result<(), Self::Error>; async fn reconnect(&mut self) -> Result<(), Self::Error>;
@@ -105,21 +105,21 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
/// This function represents an outer loop, which in turn calls provided `run_loop` function to do /// This function represents an outer loop, which in turn calls provided `run_loop` function to do
/// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source, /// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source,
/// target or both) and calls `run_loop` again. /// target or both) and calls `run_loop` again.
pub async fn run<R, F>(mut self, run_loop: R) -> Result<(), String> pub async fn run<R, F>(mut self, loop_name: String, run_loop: R) -> Result<(), String>
where where
R: Fn(SC, TC, Option<LM>) -> F, R: 'static + Send + Fn(SC, TC, Option<LM>) -> F,
F: Future<Output = Result<(), FailedClient>>, F: 'static + Send + Future<Output = Result<(), FailedClient>>,
SC: Client, SC: 'static + Client,
TC: Client, TC: 'static + Client,
LM: Clone, LM: 'static + Send + Clone,
{ {
async_std::task::spawn(async move {
crate::initialize::initialize_loop(loop_name);
loop { loop {
let result = run_loop( let loop_metric = self.loop_metric.clone();
self.source_client.clone(), let future_result = run_loop(self.source_client.clone(), self.target_client.clone(), loop_metric);
self.target_client.clone(), let result = future_result.await;
self.loop_metric.clone(),
)
.await;
match result { match result {
Ok(()) => break, Ok(()) => break,
@@ -162,6 +162,8 @@ impl<SC, TC, LM> Loop<SC, TC, LM> {
} }
Ok(()) Ok(())
})
.await
} }
} }