Finality loop - cosmetic changes (#1609)

* Move some logic to RestartableFinalityProofsStream

* Move some logic to `Transaction`

* Avoid unnecessary split_off
This commit is contained in:
Serban Iorga
2022-10-20 17:03:37 +03:00
committed by Bastian Köcher
parent a3dc2d2748
commit 597ea49664
2 changed files with 132 additions and 115 deletions
+124 -104
View File
@@ -32,6 +32,7 @@ use relay_utils::{
HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker, HeaderId, MaybeConnectionError, TrackedTransactionStatus, TransactionTracker,
}; };
use std::{ use std::{
fmt::Debug,
pin::Pin, pin::Pin,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -182,15 +183,114 @@ pub(crate) struct Transaction<Tracker, Number> {
pub submitted_header_number: Number, pub submitted_header_number: Number,
} }
impl<Tracker: TransactionTracker, Number: Debug + PartialOrd> Transaction<Tracker, Number> {
pub async fn submit<
C: TargetClient<P, TransactionTracker = Tracker>,
P: FinalitySyncPipeline<Number = Number>,
>(
target_client: &C,
header: P::Header,
justification: P::FinalityProof,
) -> Result<Self, C::Error> {
let submitted_header_number = header.number();
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
submitted_header_number,
P::TARGET_NAME,
);
let tracker = target_client.submit_finality_proof(header, justification).await?;
Ok(Transaction { tracker, submitted_header_number })
}
pub async fn track<C: TargetClient<P>, P: FinalitySyncPipeline<Number = Number>>(
self,
target_client: &C,
) -> Result<(), String> {
match self.tracker.wait().await {
TrackedTransactionStatus::Finalized(_) => {
// The transaction has been finalized, but it may have been finalized in the
// "failed" state. So let's check if the block number was actually updated.
// If it wasn't then we are stalled.
//
// Please also note that we're returning an error if we fail to read required data
// from the target client - that's the best we can do here to avoid actual stall.
target_client
.best_finalized_source_block_id()
.await
.map_err(|e| format!("failed to read best block from target node: {:?}", e))
.and_then(|best_id_at_target| {
if self.submitted_header_number > best_id_at_target.0 {
return Err(format!(
"best block at target after tx is {:?} and we've submitted {:?}",
best_id_at_target.0, self.submitted_header_number,
))
}
Ok(())
})
},
TrackedTransactionStatus::Lost => Err("transaction failed".to_string()),
}
}
}
/// Finality proofs stream that may be restarted. /// Finality proofs stream that may be restarted.
pub(crate) struct RestartableFinalityProofsStream<S> { pub(crate) struct RestartableFinalityProofsStream<S> {
/// Flag that the stream needs to be restarted. /// Flag that the stream needs to be restarted.
pub(crate) needs_restart: bool, pub(crate) needs_restart: bool,
/// The stream itself. /// The stream itself.
pub(crate) stream: Pin<Box<S>>, stream: Pin<Box<S>>,
}
impl<S: Stream> RestartableFinalityProofsStream<S> {
pub async fn create_raw_stream<
C: SourceClient<P, FinalityProofsStream = S>,
P: FinalitySyncPipeline,
>(
source_client: &C,
) -> Result<S, FailedClient> {
source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
P::SOURCE_NAME,
error,
);
FailedClient::Source
})
}
pub async fn restart_if_scheduled<
C: SourceClient<P, FinalityProofsStream = S>,
P: FinalitySyncPipeline,
>(
&mut self,
source_client: &C,
) -> Result<(), FailedClient> {
if self.needs_restart {
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);
self.needs_restart = false;
self.stream = Box::pin(Self::create_raw_stream(source_client).await?);
}
Ok(())
}
pub fn next(&mut self) -> Option<S::Item> {
match self.stream.next().now_or_never() {
Some(Some(finality_proof)) => Some(finality_proof),
Some(None) => {
self.needs_restart = true;
None
},
None => None,
}
}
} }
#[cfg(test)]
impl<S> From<S> for RestartableFinalityProofsStream<S> { impl<S> From<S> for RestartableFinalityProofsStream<S> {
fn from(stream: S) -> Self { fn from(stream: S) -> Self {
RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) } RestartableFinalityProofsStream { needs_restart: false, stream: Box::pin(stream) }
@@ -218,27 +318,12 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
metrics_sync: Option<SyncLoopMetrics>, metrics_sync: Option<SyncLoopMetrics>,
exit_signal: impl Future<Output = ()>, exit_signal: impl Future<Output = ()>,
) -> Result<(), FailedClient> { ) -> Result<(), FailedClient> {
let restart_finality_proofs_stream = || async {
source_client.finality_proofs().await.map_err(|error| {
log::error!(
target: "bridge",
"Failed to subscribe to {} justifications: {:?}. Going to reconnect",
P::SOURCE_NAME,
error,
);
FailedClient::Source
})
};
let last_transaction_tracker = futures::future::Fuse::terminated(); let last_transaction_tracker = futures::future::Fuse::terminated();
let exit_signal = exit_signal.fuse(); let exit_signal = exit_signal.fuse();
futures::pin_mut!(last_transaction_tracker, exit_signal); futures::pin_mut!(last_transaction_tracker, exit_signal);
let mut finality_proofs_stream = RestartableFinalityProofsStream { let mut finality_proofs_stream =
needs_restart: false, RestartableFinalityProofsStream::create_raw_stream(&source_client).await?.into();
stream: Box::pin(restart_finality_proofs_stream().await?),
};
let mut recent_finality_proofs = Vec::new(); let mut recent_finality_proofs = Vec::new();
let mut progress = (Instant::now(), None); let mut progress = (Instant::now(), None);
@@ -263,10 +348,9 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
// deal with errors // deal with errors
let next_tick = match iteration_result { let next_tick = match iteration_result {
Ok(Some(updated_last_transaction)) => { Ok(Some(updated_transaction)) => {
last_transaction_tracker.set(updated_last_transaction.tracker.wait().fuse()); last_submitted_header_number = Some(updated_transaction.submitted_header_number);
last_submitted_header_number = last_transaction_tracker.set(updated_transaction.track(&target_client).fuse());
Some(updated_last_transaction.submitted_header_number);
retry_backoff.reset(); retry_backoff.reset();
sync_params.tick sync_params.tick
}, },
@@ -280,66 +364,23 @@ pub(crate) async fn run_until_connection_lost<P: FinalitySyncPipeline>(
retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY) retry_backoff.next_backoff().unwrap_or(relay_utils::relay_loop::RECONNECT_DELAY)
}, },
}; };
if finality_proofs_stream.needs_restart { finality_proofs_stream.restart_if_scheduled(&source_client).await?;
log::warn!(target: "bridge", "{} finality proofs stream is being restarted", P::SOURCE_NAME);
finality_proofs_stream.needs_restart = false;
finality_proofs_stream.stream = Box::pin(restart_finality_proofs_stream().await?);
}
// wait till exit signal, or new source block // wait till exit signal, or new source block
select! { select! {
transaction_status = last_transaction_tracker => { transaction_result = last_transaction_tracker => {
match transaction_status { transaction_result.map_err(|e| {
TrackedTransactionStatus::Finalized(_) => {
// transaction has been finalized, but it may have been finalized in the "failed" state. So
// let's check if the block number has been actually updated. If it is not, then we are stalled.
//
// please also note that we're restarting the loop if we have failed to read required data
// from the target client - that's the best we can do here to avoid actual stall.
target_client
.best_finalized_source_block_id()
.await
.map_err(|e| format!("failed to read best block from target node: {:?}", e))
.and_then(|best_id_at_target| {
let last_submitted_header_number = last_submitted_header_number
.expect("always Some when last_transaction_tracker is set;\
last_transaction_tracker is set;\
qed");
if last_submitted_header_number > best_id_at_target.0 {
Err(format!(
"best block at target after tx is {:?} and we've submitted {:?}",
best_id_at_target,
last_submitted_header_number,
))
} else {
Ok(())
}
})
.map_err(|e| {
log::error!( log::error!(
target: "bridge", target: "bridge",
"Failed Finality synchronization from {} to {} has stalled. Transaction failed: {}. \ "Finality synchronization from {} to {} has stalled with error: {}. Going to restart",
Going to restart",
P::SOURCE_NAME, P::SOURCE_NAME,
P::TARGET_NAME, P::TARGET_NAME,
e, e,
); );
// Restart the loop if we're stalled.
FailedClient::Both FailedClient::Both
})?; })?
},
TrackedTransactionStatus::Lost => {
log::error!(
target: "bridge",
"Finality synchronization from {} to {} has stalled. Going to restart",
P::SOURCE_NAME,
P::TARGET_NAME,
);
return Err(FailedClient::Both);
},
}
}, },
_ = async_std::task::sleep(next_tick).fuse() => {}, _ = async_std::task::sleep(next_tick).fuse() => {},
_ = exit_signal => return Ok(()), _ = exit_signal => return Ok(()),
@@ -414,20 +455,10 @@ where
.await? .await?
{ {
Some((header, justification)) => { Some((header, justification)) => {
let submitted_header_number = header.number(); let transaction = Transaction::submit(target_client, header, justification)
log::debug!(
target: "bridge",
"Going to submit finality proof of {} header #{:?} to {}",
P::SOURCE_NAME,
submitted_header_number,
P::TARGET_NAME,
);
let tracker = target_client
.submit_finality_proof(header, justification)
.await .await
.map_err(Error::Target)?; .map_err(Error::Target)?;
Ok(Some(Transaction { tracker, submitted_header_number })) Ok(Some(transaction))
}, },
None => Ok(None), None => Ok(None),
} }
@@ -598,17 +629,7 @@ pub(crate) fn read_finality_proofs_from_stream<
let mut proofs_count = 0; let mut proofs_count = 0;
let mut first_header_number = None; let mut first_header_number = None;
let mut last_header_number = None; let mut last_header_number = None;
loop { while let Some(finality_proof) = finality_proofs_stream.next() {
let next_proof = finality_proofs_stream.stream.next();
let finality_proof = match next_proof.now_or_never() {
Some(Some(finality_proof)) => finality_proof,
Some(None) => {
finality_proofs_stream.needs_restart = true;
break
},
None => break,
};
let target_header_number = finality_proof.target_header_number(); let target_header_number = finality_proof.target_header_number();
if first_header_number.is_none() { if first_header_number.is_none() {
first_header_number = Some(target_header_number); first_header_number = Some(target_header_number);
@@ -700,16 +721,15 @@ pub(crate) fn prune_recent_finality_proofs<P: FinalitySyncPipeline>(
recent_finality_proofs: &mut FinalityProofs<P>, recent_finality_proofs: &mut FinalityProofs<P>,
recent_finality_proofs_limit: usize, recent_finality_proofs_limit: usize,
) { ) {
let position = recent_finality_proofs let justified_header_idx = recent_finality_proofs
.binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number); .binary_search_by_key(&justified_header_number, |(header_number, _)| *header_number)
.map(|idx| idx + 1)
.unwrap_or_else(|idx| idx);
let proofs_limit_idx =
recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit);
// remove all obsolete elements *recent_finality_proofs =
*recent_finality_proofs = recent_finality_proofs recent_finality_proofs.split_off(std::cmp::max(justified_header_idx, proofs_limit_idx));
.split_off(position.map(|position| position + 1).unwrap_or_else(|position| position));
// now - limit vec by size
let split_index = recent_finality_proofs.len().saturating_sub(recent_finality_proofs_limit);
*recent_finality_proofs = recent_finality_proofs.split_off(split_index);
} }
fn print_sync_progress<P: FinalitySyncPipeline>( fn print_sync_progress<P: FinalitySyncPipeline>(
@@ -552,10 +552,7 @@ fn different_forks_at_source_and_at_target_are_detected() {
); );
let mut progress = (Instant::now(), None); let mut progress = (Instant::now(), None);
let mut finality_proofs_stream = RestartableFinalityProofsStream { let mut finality_proofs_stream = futures::stream::iter(vec![]).boxed().into();
needs_restart: false,
stream: Box::pin(futures::stream::iter(vec![]).boxed()),
};
let mut recent_finality_proofs = Vec::new(); let mut recent_finality_proofs = Vec::new();
let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap(); let metrics_sync = SyncLoopMetrics::new(None, "source", "target").unwrap();
async_std::task::block_on(run_loop_iteration::<TestFinalitySyncPipeline, _, _>( async_std::task::block_on(run_loop_iteration::<TestFinalitySyncPipeline, _, _>(