Do not stall on lost transaction (#1903)

* `select_nonces_to_deliver` is no longer `&mut self`

* reset submitted nonces on lost transaction

* clippy

* fmt
This commit is contained in:
Svyatoslav Nikolsky
2023-02-24 09:01:13 +03:00
committed by Bastian Köcher
parent e7debc4b09
commit dd58493ea4
4 changed files with 36 additions and 82 deletions
@@ -1017,58 +1017,6 @@ pub(crate) mod tests {
assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],); assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
} }
#[test]
fn message_lane_loop_is_able_to_recover_from_race_stall() {
// with this configuration, both source and target clients will lose their transactions =>
// reconnect will happen
let (source_exit_sender, exit_receiver) = unbounded();
let target_exit_sender = source_exit_sender.clone();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
source_latest_generated_nonce: 1,
source_tracked_transaction_status: TrackedTransactionStatus::Lost,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
target_latest_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Lost,
..Default::default()
})),
Arc::new(move |data: &mut TestClientData| {
if data.is_source_reconnected {
data.source_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
source_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.target_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
target_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);
assert!(result.is_source_reconnected);
}
#[test] #[test]
fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() { fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() {
// with this configuration, both source and target clients will mine their transactions, but // with this configuration, both source and target clients will mine their transactions, but
@@ -1146,7 +1094,6 @@ pub(crate) mod tests {
exit_receiver.into_future().map(|(_, _)| ()), exit_receiver.into_future().map(|(_, _)| ()),
); );
assert!(result.is_source_reconnected);
assert_eq!(result.submitted_messages_proofs.len(), 2); assert_eq!(result.submitted_messages_proofs.len(), 2);
assert_eq!(result.submitted_messages_receiving_proofs.len(), 2); assert_eq!(result.submitted_messages_receiving_proofs.len(), 2);
} }
@@ -292,11 +292,16 @@ impl<P: MessageLane, SC, TC> std::fmt::Debug for MessageDeliveryStrategy<P, SC,
impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC> { impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC> {
/// Returns total weight of all undelivered messages. /// Returns total weight of all undelivered messages.
fn total_queued_dispatch_weight(&self) -> Weight { fn dispatch_weight_for_range(&self, range: &RangeInclusive<MessageNonce>) -> Weight {
self.strategy self.strategy
.source_queue() .source_queue()
.iter() .iter()
.flat_map(|(_, range)| range.values().map(|details| details.dispatch_weight)) .flat_map(|(_, subrange)| {
subrange
.iter()
.filter(|(nonce, _)| range.contains(nonce))
.map(|(_, details)| details.dispatch_weight)
})
.fold(Weight::zero(), |total, weight| total.saturating_add(weight)) .fold(Weight::zero(), |total, weight| total.saturating_add(weight))
} }
} }
@@ -424,7 +429,7 @@ where
} }
async fn select_nonces_to_deliver( async fn select_nonces_to_deliver(
&mut self, &self,
race_state: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>, race_state: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> { ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
let best_finalized_source_header_id_at_best_target = let best_finalized_source_header_id_at_best_target =
@@ -526,7 +531,6 @@ where
let maximal_source_queue_index = let maximal_source_queue_index =
self.strategy.maximal_available_source_queue_index(race_state)?; self.strategy.maximal_available_source_queue_index(race_state)?;
let previous_total_dispatch_weight = self.total_queued_dispatch_weight();
let source_queue = self.strategy.source_queue(); let source_queue = self.strategy.source_queue();
let reference = RelayMessagesBatchReference { let reference = RelayMessagesBatchReference {
@@ -544,10 +548,7 @@ where
let range_begin = source_queue[0].1.begin(); let range_begin = source_queue[0].1.begin();
let selected_nonces = range_begin..=range_end; let selected_nonces = range_begin..=range_end;
self.strategy.remove_le_nonces_from_source_queue(range_end); let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);
let new_total_dispatch_weight = self.total_queued_dispatch_weight();
let dispatch_weight = previous_total_dispatch_weight - new_total_dispatch_weight;
Some(( Some((
selected_nonces, selected_nonces,
@@ -707,7 +708,7 @@ mod tests {
#[async_std::test] #[async_std::test]
async fn message_delivery_strategy_selects_messages_to_deliver() { async fn message_delivery_strategy_selects_messages_to_deliver() {
let (state, mut strategy) = prepare_strategy(); let (state, strategy) = prepare_strategy();
// both sides are ready to relay new messages // both sides are ready to relay new messages
assert_eq!( assert_eq!(
@@ -211,7 +211,7 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
/// data) from source to target node. /// data) from source to target node.
/// Additionally, parameters required to generate proof are returned. /// Additionally, parameters required to generate proof are returned.
async fn select_nonces_to_deliver( async fn select_nonces_to_deliver(
&mut self, &self,
race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>, race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>; ) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>;
} }
@@ -234,6 +234,13 @@ pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> {
pub nonces_submitted: Option<RangeInclusive<MessageNonce>>, pub nonces_submitted: Option<RangeInclusive<MessageNonce>>,
} }
impl<SourceHeaderId, TargetHeaderId, Proof> RaceState<SourceHeaderId, TargetHeaderId, Proof> {
/// Reset `nonces_submitted` to `None`.
fn reset_submitted(&mut self) {
self.nonces_submitted = None;
}
}
/// Run race loop until connection with target or source node is lost. /// Run race loop until connection with target or source node is lost.
pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>( pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
race_source: SC, race_source: SC,
@@ -460,7 +467,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
(TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => { (TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => {
// our transaction has been mined, but was it successful or not? let's check the best // our transaction has been mined, but was it successful or not? let's check the best
// nonce at the target node. // nonce at the target node.
race_target.nonces(at_block, false) let _ = race_target.nonces(at_block, false)
.await .await
.map_err(|e| format!("failed to read nonces from target node: {e:?}")) .map_err(|e| format!("failed to read nonces from target node: {e:?}"))
.and_then(|(_, nonces_at_target)| { .and_then(|(_, nonces_at_target)| {
@@ -477,26 +484,26 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
.map_err(|e| { .map_err(|e| {
log::error!( log::error!(
target: "bridge", target: "bridge",
"{} -> {} race has stalled. Transaction failed: {}. Going to restart", "{} -> {} race transaction failed: {}",
P::source_name(), P::source_name(),
P::target_name(), P::target_name(),
e, e,
); );
FailedClient::Both race_state.reset_submitted();
})?; });
}, },
(TrackedTransactionStatus::Lost, _) => { (TrackedTransactionStatus::Lost, _) => {
log::warn!( log::warn!(
target: "bridge", target: "bridge",
"{} -> {} race has stalled. State: {:?}. Strategy: {:?}", "{} -> {} race transaction has been lost. State: {:?}. Strategy: {:?}",
P::source_name(), P::source_name(),
P::target_name(), P::target_name(),
race_state, race_state,
strategy, strategy,
); );
return Err(FailedClient::Both); race_state.reset_submitted();
}, },
_ => (), _ => (),
} }
@@ -531,8 +538,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
race_state.clone() race_state.clone()
}; };
let nonces_to_deliver = let nonces_to_deliver = select_nonces_to_deliver(expected_race_state, &strategy).await;
select_nonces_to_deliver(expected_race_state, &mut strategy).await;
let best_at_source = strategy.best_at_source(); let best_at_source = strategy.best_at_source();
if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver { if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
@@ -665,7 +671,7 @@ where
async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>( async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>(
race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>, race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>,
strategy: &mut Strategy, strategy: &Strategy,
) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)> ) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)>
where where
SourceHeaderId: Clone, SourceHeaderId: Clone,
@@ -723,7 +729,7 @@ mod tests {
// the proof will be generated on source, but using BEST_AT_TARGET block // the proof will be generated on source, but using BEST_AT_TARGET block
assert_eq!( assert_eq!(
select_nonces_to_deliver(race_state, &mut strategy).await, select_nonces_to_deliver(race_state, &strategy).await,
Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),)) Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),))
); );
} }
@@ -136,7 +136,7 @@ impl<
} }
/// Remove all nonces that are less than or equal to given nonce from the source queue. /// Remove all nonces that are less than or equal to given nonce from the source queue.
pub fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) { fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) {
while let Some((queued_at, queued_range)) = self.source_queue.pop_front() { while let Some((queued_at, queued_range)) = self.source_queue.pop_front() {
if let Some(range_to_requeue) = queued_range.greater_than(nonce) { if let Some(range_to_requeue) = queued_range.greater_than(nonce) {
self.source_queue.push_front((queued_at, range_to_requeue)); self.source_queue.push_front((queued_at, range_to_requeue));
@@ -168,12 +168,12 @@ impl<
SourceNoncesRange, SourceNoncesRange,
Proof, Proof,
> where > where
SourceHeaderHash: Clone + Debug + Send, SourceHeaderHash: Clone + Debug + Send + Sync,
SourceHeaderNumber: Clone + Ord + Debug + Send, SourceHeaderNumber: Clone + Ord + Debug + Send + Sync,
SourceNoncesRange: NoncesRange + Debug + Send, SourceNoncesRange: NoncesRange + Debug + Send + Sync,
TargetHeaderHash: Debug + Send, TargetHeaderHash: Debug + Send + Sync,
TargetHeaderNumber: Debug + Send, TargetHeaderNumber: Debug + Send + Sync,
Proof: Debug + Send, Proof: Debug + Send + Sync,
{ {
type SourceNoncesRange = SourceNoncesRange; type SourceNoncesRange = SourceNoncesRange;
type ProofParameters = (); type ProofParameters = ();
@@ -284,6 +284,7 @@ impl<
Proof, Proof,
>, >,
) { ) {
self.remove_le_nonces_from_source_queue(nonces.latest_nonce); // TODO: does it means that we'll try to submit old nonces in next tx???
self.best_target_nonce = Some(std::cmp::max( self.best_target_nonce = Some(std::cmp::max(
self.best_target_nonce.unwrap_or(nonces.latest_nonce), self.best_target_nonce.unwrap_or(nonces.latest_nonce),
nonces.latest_nonce, nonces.latest_nonce,
@@ -291,7 +292,7 @@ impl<
} }
async fn select_nonces_to_deliver( async fn select_nonces_to_deliver(
&mut self, &self,
race_state: RaceState< race_state: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>,
@@ -301,7 +302,6 @@ impl<
let maximal_source_queue_index = self.maximal_available_source_queue_index(race_state)?; let maximal_source_queue_index = self.maximal_available_source_queue_index(race_state)?;
let range_begin = self.source_queue[0].1.begin(); let range_begin = self.source_queue[0].1.begin();
let range_end = self.source_queue[maximal_source_queue_index].1.end(); let range_end = self.source_queue[maximal_source_queue_index].1.end();
self.remove_le_nonces_from_source_queue(range_end);
Some((range_begin..=range_end, ())) Some((range_begin..=range_end, ()))
} }
} }