mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 19:21:13 +00:00
Keep multiple latest confirmed nonces at source in messages relay (#719)
* keep multiple latest confirmed nonces at source in messages relay * post-merge fix
This commit is contained in:
committed by
Bastian Köcher
parent
705a41528f
commit
c4b931ba29
@@ -29,7 +29,12 @@ use async_trait::async_trait;
|
||||
use bp_message_lane::{MessageNonce, UnrewardedRelayersState, Weight};
|
||||
use futures::stream::FusedStream;
|
||||
use relay_utils::FailedClient;
|
||||
use std::{collections::BTreeMap, marker::PhantomData, ops::RangeInclusive, time::Duration};
|
||||
use std::{
|
||||
collections::{BTreeMap, VecDeque},
|
||||
marker::PhantomData,
|
||||
ops::RangeInclusive,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// Run message delivery race.
|
||||
pub async fn run<P: MessageLane>(
|
||||
@@ -61,7 +66,7 @@ pub async fn run<P: MessageLane>(
|
||||
max_messages_in_single_batch: params.max_messages_in_single_batch,
|
||||
max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch,
|
||||
max_messages_size_in_single_batch: params.max_messages_size_in_single_batch,
|
||||
latest_confirmed_nonce_at_source: None,
|
||||
latest_confirmed_nonces_at_source: VecDeque::new(),
|
||||
target_nonces: None,
|
||||
strategy: BasicStrategy::new(),
|
||||
},
|
||||
@@ -164,14 +169,17 @@ where
|
||||
async fn nonces(
|
||||
&self,
|
||||
at_block: TargetHeaderIdOf<P>,
|
||||
update_metrics: bool,
|
||||
) -> Result<(TargetHeaderIdOf<P>, TargetClientNonces<DeliveryRaceTargetNoncesData>), Self::Error> {
|
||||
let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
|
||||
let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?;
|
||||
let (at_block, unrewarded_relayers) = self.client.unrewarded_relayers_state(at_block).await?;
|
||||
|
||||
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
|
||||
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
|
||||
metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
|
||||
if update_metrics {
|
||||
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
|
||||
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
|
||||
metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
@@ -221,8 +229,8 @@ struct MessageDeliveryStrategy<P: MessageLane> {
|
||||
max_messages_weight_in_single_batch: Weight,
|
||||
/// Maximal messages size in the single delivery transaction.
|
||||
max_messages_size_in_single_batch: usize,
|
||||
/// Latest confirmed nonce at the source client.
|
||||
latest_confirmed_nonce_at_source: Option<MessageNonce>,
|
||||
/// Latest confirmed nonces at the source client + the header id where we have first met this nonce.
|
||||
latest_confirmed_nonces_at_source: VecDeque<(SourceHeaderIdOf<P>, MessageNonce)>,
|
||||
/// Target nonces from the source client.
|
||||
target_nonces: Option<TargetClientNonces<DeliveryRaceTargetNoncesData>>,
|
||||
/// Basic delivery strategy.
|
||||
@@ -259,8 +267,8 @@ impl<P: MessageLane> std::fmt::Debug for MessageDeliveryStrategy<P> {
|
||||
&self.max_messages_size_in_single_batch,
|
||||
)
|
||||
.field(
|
||||
"latest_confirmed_noncs_at_source",
|
||||
&self.latest_confirmed_nonce_at_source,
|
||||
"latest_confirmed_nonces_at_source",
|
||||
&self.latest_confirmed_nonces_at_source,
|
||||
)
|
||||
.field("target_nonces", &self.target_nonces)
|
||||
.field("strategy", &self.strategy)
|
||||
@@ -292,17 +300,64 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
|
||||
at_block: SourceHeaderIdOf<P>,
|
||||
nonces: SourceClientNonces<Self::SourceNoncesRange>,
|
||||
) {
|
||||
self.latest_confirmed_nonce_at_source = nonces.confirmed_nonce;
|
||||
if let Some(confirmed_nonce) = nonces.confirmed_nonce {
|
||||
let is_confirmed_nonce_updated = self
|
||||
.latest_confirmed_nonces_at_source
|
||||
.back()
|
||||
.map(|(_, prev_nonce)| *prev_nonce != confirmed_nonce)
|
||||
.unwrap_or(true);
|
||||
if is_confirmed_nonce_updated {
|
||||
self.latest_confirmed_nonces_at_source
|
||||
.push_back((at_block.clone(), confirmed_nonce));
|
||||
}
|
||||
}
|
||||
self.strategy.source_nonces_updated(at_block, nonces)
|
||||
}
|
||||
|
||||
fn target_nonces_updated(
|
||||
fn best_target_nonces_updated(
|
||||
&mut self,
|
||||
nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
|
||||
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
|
||||
) {
|
||||
self.target_nonces = Some(nonces.clone());
|
||||
self.strategy.target_nonces_updated(
|
||||
// best target nonces must always be ge than finalized target nonces
|
||||
let mut target_nonces = self.target_nonces.take().unwrap_or_else(|| nonces.clone());
|
||||
target_nonces.nonces_data = nonces.nonces_data.clone();
|
||||
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
|
||||
self.target_nonces = Some(target_nonces);
|
||||
|
||||
self.strategy.best_target_nonces_updated(
|
||||
TargetClientNonces {
|
||||
latest_nonce: nonces.latest_nonce,
|
||||
nonces_data: (),
|
||||
},
|
||||
race_state,
|
||||
)
|
||||
}
|
||||
|
||||
fn finalized_target_nonces_updated(
|
||||
&mut self,
|
||||
nonces: TargetClientNonces<DeliveryRaceTargetNoncesData>,
|
||||
race_state: &mut RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
|
||||
) {
|
||||
if let Some(ref best_finalized_source_header_id_at_best_target) =
|
||||
race_state.best_finalized_source_header_id_at_best_target
|
||||
{
|
||||
let oldest_header_number_to_keep = best_finalized_source_header_id_at_best_target.0;
|
||||
while self
|
||||
.latest_confirmed_nonces_at_source
|
||||
.front()
|
||||
.map(|(id, _)| id.0 < oldest_header_number_to_keep)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
self.latest_confirmed_nonces_at_source.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref mut target_nonces) = self.target_nonces {
|
||||
target_nonces.latest_nonce = std::cmp::max(target_nonces.latest_nonce, nonces.latest_nonce);
|
||||
}
|
||||
|
||||
self.strategy.finalized_target_nonces_updated(
|
||||
TargetClientNonces {
|
||||
latest_nonce: nonces.latest_nonce,
|
||||
nonces_data: (),
|
||||
@@ -315,7 +370,14 @@ impl<P: MessageLane> RaceStrategy<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::M
|
||||
&mut self,
|
||||
race_state: &RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
|
||||
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
|
||||
let latest_confirmed_nonce_at_source = self.latest_confirmed_nonce_at_source?;
|
||||
let best_finalized_source_header_id_at_best_target =
|
||||
race_state.best_finalized_source_header_id_at_best_target.clone()?;
|
||||
let latest_confirmed_nonce_at_source = self
|
||||
.latest_confirmed_nonces_at_source
|
||||
.iter()
|
||||
.take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0)
|
||||
.last()
|
||||
.map(|(_, nonce)| *nonce)?;
|
||||
let target_nonces = self.target_nonces.as_ref()?;
|
||||
|
||||
// There's additional condition in the message delivery race: target would reject messages
|
||||
@@ -509,6 +571,7 @@ mod tests {
|
||||
best_finalized_source_header_id_at_source: Some(header_id(1)),
|
||||
best_finalized_source_header_id_at_best_target: Some(header_id(1)),
|
||||
best_target_header_id: Some(header_id(1)),
|
||||
best_finalized_target_header_id: Some(header_id(1)),
|
||||
nonces_to_submit: None,
|
||||
nonces_submitted: None,
|
||||
};
|
||||
@@ -519,7 +582,7 @@ mod tests {
|
||||
max_messages_in_single_batch: 4,
|
||||
max_messages_weight_in_single_batch: 4,
|
||||
max_messages_size_in_single_batch: 4,
|
||||
latest_confirmed_nonce_at_source: Some(19),
|
||||
latest_confirmed_nonces_at_source: vec![(header_id(1), 19)].into_iter().collect(),
|
||||
target_nonces: Some(TargetClientNonces {
|
||||
latest_nonce: 19,
|
||||
nonces_data: DeliveryRaceTargetNoncesData {
|
||||
@@ -548,13 +611,17 @@ mod tests {
|
||||
confirmed_nonce: Some(19),
|
||||
},
|
||||
);
|
||||
race_strategy.strategy.target_nonces_updated(
|
||||
TargetClientNonces {
|
||||
latest_nonce: 19,
|
||||
nonces_data: (),
|
||||
},
|
||||
&mut race_state,
|
||||
);
|
||||
|
||||
let target_nonces = TargetClientNonces {
|
||||
latest_nonce: 19,
|
||||
nonces_data: (),
|
||||
};
|
||||
race_strategy
|
||||
.strategy
|
||||
.best_target_nonces_updated(target_nonces.clone(), &mut race_state);
|
||||
race_strategy
|
||||
.strategy
|
||||
.finalized_target_nonces_updated(target_nonces, &mut race_state);
|
||||
|
||||
(race_state, race_strategy)
|
||||
}
|
||||
@@ -611,8 +678,12 @@ mod tests {
|
||||
|
||||
// if there are already `max_unconfirmed_nonces_at_target` messages on target,
|
||||
// we need to wait until confirmations will be delivered by receiving race
|
||||
strategy.latest_confirmed_nonce_at_source =
|
||||
Some(strategy.target_nonces.as_ref().unwrap().latest_nonce - strategy.max_unconfirmed_nonces_at_target);
|
||||
strategy.latest_confirmed_nonces_at_source = vec![(
|
||||
header_id(1),
|
||||
strategy.target_nonces.as_ref().unwrap().latest_nonce - strategy.max_unconfirmed_nonces_at_target,
|
||||
)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||
}
|
||||
|
||||
@@ -622,7 +693,7 @@ mod tests {
|
||||
|
||||
// if there are new confirmed nonces on source, we want to relay this information
|
||||
// to target to prune rewards queue
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
|
||||
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
|
||||
assert_eq!(
|
||||
strategy.select_nonces_to_deliver(&state),
|
||||
@@ -650,7 +721,7 @@ mod tests {
|
||||
|
||||
// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
|
||||
// we need to prove at least `messages_in_oldest_entry` rewards
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
|
||||
{
|
||||
let mut nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
|
||||
nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
|
||||
@@ -667,7 +738,7 @@ mod tests {
|
||||
|
||||
// if there are already `max_unrewarded_relayer_entries_at_target` entries at target,
|
||||
// we need to prove at least `messages_in_oldest_entry` rewards
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
|
||||
{
|
||||
let mut nonces_data = &mut strategy.target_nonces.as_mut().unwrap().nonces_data;
|
||||
nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 3;
|
||||
@@ -747,12 +818,54 @@ mod tests {
|
||||
|
||||
// 1 delivery confirmation from target to source is still missing, so we may only
|
||||
// relay 3 new messages
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonce_at_source.unwrap();
|
||||
strategy.latest_confirmed_nonce_at_source = Some(prev_confirmed_nonce_at_source - 1);
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
|
||||
strategy.latest_confirmed_nonces_at_source = vec![(header_id(1), prev_confirmed_nonce_at_source - 1)]
|
||||
.into_iter()
|
||||
.collect();
|
||||
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
|
||||
assert_eq!(
|
||||
strategy.select_nonces_to_deliver(&state),
|
||||
Some(((20..=22), proof_parameters(false, 3)))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn message_delivery_strategy_waits_for_confirmed_nonce_header_to_appear_on_target() {
|
||||
// 1 delivery confirmation from target to source is still missing, so we may deliver
|
||||
// reward confirmation with our message delivery transaction. But the problem is that
|
||||
// the reward has been paid at header 2 && this header is still unknown to target node.
|
||||
//
|
||||
// => so we can't deliver more than 3 messages
|
||||
let (mut state, mut strategy) = prepare_strategy();
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
|
||||
strategy.latest_confirmed_nonces_at_source = vec![
|
||||
(header_id(1), prev_confirmed_nonce_at_source - 1),
|
||||
(header_id(2), prev_confirmed_nonce_at_source),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
|
||||
assert_eq!(
|
||||
strategy.select_nonces_to_deliver(&state),
|
||||
Some(((20..=22), proof_parameters(false, 3)))
|
||||
);
|
||||
|
||||
// the same situation, but the header 2 is known to the target node, so we may deliver reward confirmation
|
||||
let (mut state, mut strategy) = prepare_strategy();
|
||||
let prev_confirmed_nonce_at_source = strategy.latest_confirmed_nonces_at_source.back().unwrap().1;
|
||||
strategy.latest_confirmed_nonces_at_source = vec![
|
||||
(header_id(1), prev_confirmed_nonce_at_source - 1),
|
||||
(header_id(2), prev_confirmed_nonce_at_source),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
strategy.target_nonces.as_mut().unwrap().nonces_data.confirmed_nonce = prev_confirmed_nonce_at_source - 1;
|
||||
state.best_finalized_source_header_id_at_source = Some(header_id(2));
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
|
||||
assert_eq!(
|
||||
strategy.select_nonces_to_deliver(&state),
|
||||
Some(((20..=23), proof_parameters(true, 4)))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,6 +127,7 @@ pub trait TargetClient<P: MessageRace> {
|
||||
async fn nonces(
|
||||
&self,
|
||||
at_block: P::TargetHeaderId,
|
||||
update_metrics: bool,
|
||||
) -> Result<(P::TargetHeaderId, TargetClientNonces<Self::TargetNoncesData>), Self::Error>;
|
||||
/// Submit proof to the target client.
|
||||
async fn submit_proof(
|
||||
@@ -160,8 +161,14 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
|
||||
|
||||
/// Called when nonces are updated at source node of the race.
|
||||
fn source_nonces_updated(&mut self, at_block: SourceHeaderId, nonces: SourceClientNonces<Self::SourceNoncesRange>);
|
||||
/// Called when nonces are updated at target node of the race.
|
||||
fn target_nonces_updated(
|
||||
/// Called when best nonces are updated at target node of the race.
|
||||
fn best_target_nonces_updated(
|
||||
&mut self,
|
||||
nonces: TargetClientNonces<Self::TargetNoncesData>,
|
||||
race_state: &mut RaceState<SourceHeaderId, TargetHeaderId, Proof>,
|
||||
);
|
||||
/// Called when finalized nonces are updated at target node of the race.
|
||||
fn finalized_target_nonces_updated(
|
||||
&mut self,
|
||||
nonces: TargetClientNonces<Self::TargetNoncesData>,
|
||||
race_state: &mut RaceState<SourceHeaderId, TargetHeaderId, Proof>,
|
||||
@@ -185,6 +192,8 @@ pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> {
|
||||
pub best_finalized_source_header_id_at_best_target: Option<SourceHeaderId>,
|
||||
/// Best header id at the target client.
|
||||
pub best_target_header_id: Option<TargetHeaderId>,
|
||||
/// Best finalized header id at the target client.
|
||||
pub best_finalized_target_header_id: Option<TargetHeaderId>,
|
||||
/// Range of nonces that we have selected to submit.
|
||||
pub nonces_to_submit: Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Proof)>,
|
||||
/// Range of nonces that is currently submitted.
|
||||
@@ -220,8 +229,10 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
|
||||
let mut target_retry_backoff = retry_backoff();
|
||||
let mut target_client_is_online = true;
|
||||
let mut target_nonces_required = false;
|
||||
let target_nonces = futures::future::Fuse::terminated();
|
||||
let mut target_best_nonces_required = false;
|
||||
let mut target_finalized_nonces_required = false;
|
||||
let target_best_nonces = futures::future::Fuse::terminated();
|
||||
let target_finalized_nonces = futures::future::Fuse::terminated();
|
||||
let target_submit_proof = futures::future::Fuse::terminated();
|
||||
let target_go_offline_future = futures::future::Fuse::terminated();
|
||||
|
||||
@@ -231,7 +242,8 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
source_generate_proof,
|
||||
source_go_offline_future,
|
||||
race_target_updated,
|
||||
target_nonces,
|
||||
target_best_nonces,
|
||||
target_finalized_nonces,
|
||||
target_submit_proof,
|
||||
target_go_offline_future,
|
||||
);
|
||||
@@ -251,14 +263,22 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
},
|
||||
target_state = race_target_updated.next() => {
|
||||
if let Some(target_state) = target_state {
|
||||
let is_target_state_updated = race_state.best_target_header_id.as_ref()
|
||||
let is_target_best_state_updated = race_state.best_target_header_id.as_ref()
|
||||
!= Some(&target_state.best_self);
|
||||
if is_target_state_updated {
|
||||
target_nonces_required = true;
|
||||
|
||||
if is_target_best_state_updated {
|
||||
target_best_nonces_required = true;
|
||||
race_state.best_target_header_id = Some(target_state.best_self);
|
||||
race_state.best_finalized_source_header_id_at_best_target
|
||||
= Some(target_state.best_finalized_peer_at_best_self);
|
||||
}
|
||||
|
||||
let is_target_finalized_state_updated = race_state.best_finalized_target_header_id.as_ref()
|
||||
!= Some(&target_state.best_finalized_self);
|
||||
if is_target_finalized_state_updated {
|
||||
target_finalized_nonces_required = true;
|
||||
race_state.best_finalized_target_header_id = Some(target_state.best_finalized_self);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -284,8 +304,8 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
|| format!("Error retrieving nonces from {}", P::source_name()),
|
||||
).fail_if_connection_error(FailedClient::Source)?;
|
||||
},
|
||||
nonces = target_nonces => {
|
||||
target_nonces_required = false;
|
||||
nonces = target_best_nonces => {
|
||||
target_best_nonces_required = false;
|
||||
|
||||
target_client_is_online = process_future_result(
|
||||
nonces,
|
||||
@@ -293,16 +313,41 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
|(_, nonces)| {
|
||||
log::debug!(
|
||||
target: "bridge",
|
||||
"Received nonces from {}: {:?}",
|
||||
"Received best nonces from {}: {:?}",
|
||||
P::target_name(),
|
||||
nonces,
|
||||
);
|
||||
|
||||
strategy.target_nonces_updated(nonces, &mut race_state);
|
||||
let prev_best_at_target = strategy.best_at_target();
|
||||
strategy.best_target_nonces_updated(nonces, &mut race_state);
|
||||
if strategy.best_at_target() != prev_best_at_target {
|
||||
stall_countdown = Instant::now();
|
||||
}
|
||||
},
|
||||
&mut target_go_offline_future,
|
||||
async_std::task::sleep,
|
||||
|| format!("Error retrieving nonces from {}", P::target_name()),
|
||||
|| format!("Error retrieving best nonces from {}", P::target_name()),
|
||||
).fail_if_connection_error(FailedClient::Target)?;
|
||||
},
|
||||
nonces = target_finalized_nonces => {
|
||||
target_finalized_nonces_required = false;
|
||||
|
||||
target_client_is_online = process_future_result(
|
||||
nonces,
|
||||
&mut target_retry_backoff,
|
||||
|(_, nonces)| {
|
||||
log::debug!(
|
||||
target: "bridge",
|
||||
"Received finalized nonces from {}: {:?}",
|
||||
P::target_name(),
|
||||
nonces,
|
||||
);
|
||||
|
||||
strategy.finalized_target_nonces_updated(nonces, &mut race_state);
|
||||
},
|
||||
&mut target_go_offline_future,
|
||||
async_std::task::sleep,
|
||||
|| format!("Error retrieving finalized nonces from {}", P::target_name()),
|
||||
).fail_if_connection_error(FailedClient::Target)?;
|
||||
},
|
||||
|
||||
@@ -340,6 +385,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
|
||||
race_state.nonces_to_submit = None;
|
||||
race_state.nonces_submitted = Some(nonces_range);
|
||||
stall_countdown = Instant::now();
|
||||
},
|
||||
&mut target_go_offline_future,
|
||||
async_std::task::sleep,
|
||||
@@ -428,14 +474,25 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
.submit_proof(at_block.clone(), nonces_range.clone(), proof.clone())
|
||||
.fuse(),
|
||||
);
|
||||
} else if target_nonces_required {
|
||||
log::debug!(target: "bridge", "Asking {} about message nonces", P::target_name());
|
||||
} else if target_best_nonces_required {
|
||||
log::debug!(target: "bridge", "Asking {} about best message nonces", P::target_name());
|
||||
let at_block = race_state
|
||||
.best_target_header_id
|
||||
.as_ref()
|
||||
.expect("target_nonces_required is only true when best_target_header_id is Some; qed")
|
||||
.expect("target_best_nonces_required is only true when best_target_header_id is Some; qed")
|
||||
.clone();
|
||||
target_nonces.set(race_target.nonces(at_block).fuse());
|
||||
target_best_nonces.set(race_target.nonces(at_block, false).fuse());
|
||||
} else if target_finalized_nonces_required {
|
||||
log::debug!(target: "bridge", "Asking {} about finalized message nonces", P::target_name());
|
||||
let at_block = race_state
|
||||
.best_finalized_target_header_id
|
||||
.as_ref()
|
||||
.expect(
|
||||
"target_finalized_nonces_required is only true when\
|
||||
best_finalized_target_header_id is Some; qed",
|
||||
)
|
||||
.clone();
|
||||
target_finalized_nonces.set(race_target.nonces(at_block, true).fuse());
|
||||
} else {
|
||||
target_client_is_online = true;
|
||||
}
|
||||
@@ -449,6 +506,7 @@ impl<SourceHeaderId, TargetHeaderId, Proof> Default for RaceState<SourceHeaderId
|
||||
best_finalized_source_header_id_at_source: None,
|
||||
best_finalized_source_header_id_at_best_target: None,
|
||||
best_target_header_id: None,
|
||||
best_finalized_target_header_id: None,
|
||||
nonces_to_submit: None,
|
||||
nonces_submitted: None,
|
||||
}
|
||||
@@ -523,6 +581,7 @@ mod tests {
|
||||
best_finalized_source_header_id_at_source: Some(HeaderId(BEST_AT_SOURCE, BEST_AT_SOURCE)),
|
||||
best_finalized_source_header_id_at_best_target: Some(HeaderId(BEST_AT_TARGET, BEST_AT_TARGET)),
|
||||
best_target_header_id: Some(HeaderId(0, 0)),
|
||||
best_finalized_target_header_id: Some(HeaderId(0, 0)),
|
||||
nonces_to_submit: None,
|
||||
nonces_submitted: None,
|
||||
};
|
||||
@@ -536,7 +595,7 @@ mod tests {
|
||||
confirmed_nonce: None,
|
||||
},
|
||||
);
|
||||
strategy.target_nonces_updated(
|
||||
strategy.best_target_nonces_updated(
|
||||
TargetClientNonces {
|
||||
latest_nonce: 5u64,
|
||||
nonces_data: (),
|
||||
|
||||
@@ -162,10 +162,13 @@ where
|
||||
async fn nonces(
|
||||
&self,
|
||||
at_block: SourceHeaderIdOf<P>,
|
||||
update_metrics: bool,
|
||||
) -> Result<(SourceHeaderIdOf<P>, TargetClientNonces<()>), Self::Error> {
|
||||
let (at_block, latest_confirmed_nonce) = self.client.latest_confirmed_received_nonce(at_block).await?;
|
||||
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
|
||||
metrics_msg.update_source_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
|
||||
if update_metrics {
|
||||
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
|
||||
metrics_msg.update_source_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
|
||||
}
|
||||
}
|
||||
Ok((
|
||||
at_block,
|
||||
|
||||
@@ -35,8 +35,8 @@ pub struct BasicStrategy<
|
||||
> {
|
||||
/// All queued nonces.
|
||||
source_queue: VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>,
|
||||
/// Best nonce known to target node. `None` if it has not been received yet.
|
||||
target_nonce: Option<MessageNonce>,
|
||||
/// Best nonce known to target node (at its best block). `None` if it has not been received yet.
|
||||
best_target_nonce: Option<MessageNonce>,
|
||||
/// Unused generic types dump.
|
||||
_phantom: PhantomData<(TargetHeaderNumber, TargetHeaderHash, Proof)>,
|
||||
}
|
||||
@@ -52,7 +52,7 @@ where
|
||||
pub fn new() -> Self {
|
||||
BasicStrategy {
|
||||
source_queue: VecDeque::new(),
|
||||
target_nonce: None,
|
||||
best_target_nonce: None,
|
||||
_phantom: Default::default(),
|
||||
}
|
||||
}
|
||||
@@ -83,7 +83,7 @@ where
|
||||
mut selector: impl FnMut(SourceNoncesRange) -> Option<SourceNoncesRange>,
|
||||
) -> Option<RangeInclusive<MessageNonce>> {
|
||||
// if we do not know best nonce at target node, we can't select anything
|
||||
let target_nonce = self.target_nonce?;
|
||||
let target_nonce = self.best_target_nonce?;
|
||||
|
||||
// if we have already selected nonces that we want to submit, do nothing
|
||||
if race_state.nonces_to_submit.is_some() {
|
||||
@@ -164,15 +164,15 @@ where
|
||||
|
||||
fn best_at_source(&self) -> Option<MessageNonce> {
|
||||
let best_in_queue = self.source_queue.back().map(|(_, range)| range.end());
|
||||
match (best_in_queue, self.target_nonce) {
|
||||
(Some(best_in_queue), Some(target_nonce)) if best_in_queue > target_nonce => Some(best_in_queue),
|
||||
(_, Some(target_nonce)) => Some(target_nonce),
|
||||
match (best_in_queue, self.best_target_nonce) {
|
||||
(Some(best_in_queue), Some(best_target_nonce)) if best_in_queue > best_target_nonce => Some(best_in_queue),
|
||||
(_, Some(best_target_nonce)) => Some(best_target_nonce),
|
||||
(_, None) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn best_at_target(&self) -> Option<MessageNonce> {
|
||||
self.target_nonce
|
||||
self.best_target_nonce
|
||||
}
|
||||
|
||||
fn source_nonces_updated(
|
||||
@@ -184,7 +184,7 @@ where
|
||||
.source_queue
|
||||
.back()
|
||||
.map(|(_, range)| range.end())
|
||||
.or(self.target_nonce)
|
||||
.or(self.best_target_nonce)
|
||||
.unwrap_or_default();
|
||||
self.source_queue.extend(
|
||||
nonces
|
||||
@@ -195,7 +195,7 @@ where
|
||||
)
|
||||
}
|
||||
|
||||
fn target_nonces_updated(
|
||||
fn best_target_nonces_updated(
|
||||
&mut self,
|
||||
nonces: TargetClientNonces<()>,
|
||||
race_state: &mut RaceState<
|
||||
@@ -206,8 +206,8 @@ where
|
||||
) {
|
||||
let nonce = nonces.latest_nonce;
|
||||
|
||||
if let Some(target_nonce) = self.target_nonce {
|
||||
if nonce < target_nonce {
|
||||
if let Some(best_target_nonce) = self.best_target_nonce {
|
||||
if nonce < best_target_nonce {
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -241,7 +241,25 @@ where
|
||||
race_state.nonces_submitted = None;
|
||||
}
|
||||
|
||||
self.target_nonce = Some(nonce);
|
||||
self.best_target_nonce = Some(std::cmp::max(
|
||||
self.best_target_nonce.unwrap_or(nonces.latest_nonce),
|
||||
nonce,
|
||||
));
|
||||
}
|
||||
|
||||
fn finalized_target_nonces_updated(
|
||||
&mut self,
|
||||
nonces: TargetClientNonces<()>,
|
||||
_race_state: &mut RaceState<
|
||||
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
|
||||
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
|
||||
Proof,
|
||||
>,
|
||||
) {
|
||||
self.best_target_nonce = Some(std::cmp::max(
|
||||
self.best_target_nonce.unwrap_or(nonces.latest_nonce),
|
||||
nonces.latest_nonce,
|
||||
));
|
||||
}
|
||||
|
||||
fn select_nonces_to_deliver(
|
||||
@@ -302,7 +320,7 @@ mod tests {
|
||||
assert_eq!(strategy.best_at_source(), None);
|
||||
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
|
||||
assert_eq!(strategy.best_at_source(), None);
|
||||
strategy.target_nonces_updated(target_nonces(10), &mut Default::default());
|
||||
strategy.best_target_nonces_updated(target_nonces(10), &mut Default::default());
|
||||
assert_eq!(strategy.source_queue, vec![]);
|
||||
assert_eq!(strategy.best_at_source(), Some(10));
|
||||
}
|
||||
@@ -310,7 +328,7 @@ mod tests {
|
||||
#[test]
|
||||
fn source_nonce_is_never_lower_than_known_target_nonce() {
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
strategy.target_nonces_updated(target_nonces(10), &mut Default::default());
|
||||
strategy.best_target_nonces_updated(target_nonces(10), &mut Default::default());
|
||||
strategy.source_nonces_updated(header_id(1), source_nonces(1..=5));
|
||||
assert_eq!(strategy.source_queue, vec![]);
|
||||
}
|
||||
@@ -327,11 +345,11 @@ mod tests {
|
||||
#[test]
|
||||
fn target_nonce_is_never_lower_than_latest_known_target_nonce() {
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
assert_eq!(strategy.target_nonce, None);
|
||||
strategy.target_nonces_updated(target_nonces(10), &mut Default::default());
|
||||
assert_eq!(strategy.target_nonce, Some(10));
|
||||
strategy.target_nonces_updated(target_nonces(5), &mut Default::default());
|
||||
assert_eq!(strategy.target_nonce, Some(10));
|
||||
assert_eq!(strategy.best_target_nonce, None);
|
||||
strategy.best_target_nonces_updated(target_nonces(10), &mut Default::default());
|
||||
assert_eq!(strategy.best_target_nonce, Some(10));
|
||||
strategy.best_target_nonces_updated(target_nonces(5), &mut Default::default());
|
||||
assert_eq!(strategy.best_target_nonce, Some(10));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -341,9 +359,9 @@ mod tests {
|
||||
strategy.source_nonces_updated(header_id(2), source_nonces(6..=10));
|
||||
strategy.source_nonces_updated(header_id(3), source_nonces(11..=15));
|
||||
strategy.source_nonces_updated(header_id(4), source_nonces(16..=20));
|
||||
strategy.target_nonces_updated(target_nonces(15), &mut Default::default());
|
||||
strategy.best_target_nonces_updated(target_nonces(15), &mut Default::default());
|
||||
assert_eq!(strategy.source_queue, vec![(header_id(4), 16..=20)]);
|
||||
strategy.target_nonces_updated(target_nonces(17), &mut Default::default());
|
||||
strategy.best_target_nonces_updated(target_nonces(17), &mut Default::default());
|
||||
assert_eq!(strategy.source_queue, vec![(header_id(4), 18..=20)]);
|
||||
}
|
||||
|
||||
@@ -352,9 +370,9 @@ mod tests {
|
||||
let mut state = RaceState::default();
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
state.nonces_to_submit = Some((header_id(1), 5..=10, (5..=10, None)));
|
||||
strategy.target_nonces_updated(target_nonces(7), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(7), &mut state);
|
||||
assert!(state.nonces_to_submit.is_some());
|
||||
strategy.target_nonces_updated(target_nonces(10), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(10), &mut state);
|
||||
assert!(state.nonces_to_submit.is_none());
|
||||
}
|
||||
|
||||
@@ -363,9 +381,9 @@ mod tests {
|
||||
let mut state = RaceState::default();
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
state.nonces_submitted = Some(5..=10);
|
||||
strategy.target_nonces_updated(target_nonces(7), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(7), &mut state);
|
||||
assert!(state.nonces_submitted.is_some());
|
||||
strategy.target_nonces_updated(target_nonces(10), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(10), &mut state);
|
||||
assert!(state.nonces_submitted.is_none());
|
||||
}
|
||||
|
||||
@@ -374,7 +392,7 @@ mod tests {
|
||||
let mut state = RaceState::default();
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
state.nonces_to_submit = Some((header_id(1), 1..=10, (1..=10, None)));
|
||||
strategy.target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
|
||||
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||
}
|
||||
@@ -384,7 +402,7 @@ mod tests {
|
||||
let mut state = RaceState::default();
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
state.nonces_submitted = Some(1..=10);
|
||||
strategy.target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.source_nonces_updated(header_id(1), source_nonces(1..=10));
|
||||
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||
}
|
||||
@@ -393,7 +411,7 @@ mod tests {
|
||||
fn select_nonces_to_deliver_works() {
|
||||
let mut state = RaceState::<_, _, TestMessagesProof>::default();
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
strategy.target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.source_nonces_updated(header_id(1), source_nonces(1..=1));
|
||||
strategy.source_nonces_updated(header_id(2), source_nonces(2..=2));
|
||||
strategy.source_nonces_updated(header_id(3), source_nonces(3..=6));
|
||||
@@ -401,12 +419,12 @@ mod tests {
|
||||
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
|
||||
assert_eq!(strategy.select_nonces_to_deliver(&state), Some((1..=6, ())));
|
||||
strategy.target_nonces_updated(target_nonces(6), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(6), &mut state);
|
||||
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(5));
|
||||
assert_eq!(strategy.select_nonces_to_deliver(&state), Some((7..=8, ())));
|
||||
strategy.target_nonces_updated(target_nonces(8), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(8), &mut state);
|
||||
assert_eq!(strategy.select_nonces_to_deliver(&state), None);
|
||||
}
|
||||
|
||||
@@ -414,7 +432,7 @@ mod tests {
|
||||
fn select_nonces_to_deliver_able_to_split_ranges_with_selector() {
|
||||
let mut state = RaceState::<_, _, TestMessagesProof>::default();
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
strategy.target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
|
||||
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));
|
||||
|
||||
state.best_finalized_source_header_id_at_source = Some(header_id(1));
|
||||
@@ -433,7 +451,7 @@ mod tests {
|
||||
let mut state = RaceState::<_, _, TestMessagesProof>::default();
|
||||
let mut strategy = BasicStrategy::<TestMessageLane>::new();
|
||||
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));
|
||||
strategy.target_nonces_updated(target_nonces(50), &mut state);
|
||||
strategy.best_target_nonces_updated(target_nonces(50), &mut state);
|
||||
state.best_finalized_source_header_id_at_source = Some(header_id(1));
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
|
||||
state.best_target_header_id = Some(header_id(1));
|
||||
|
||||
Reference in New Issue
Block a user