mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-06 17:18:03 +00:00
submit lane unblock transactions from relay (#2030)
* submit lane unblock transactions from relay * moved body of select_nonces_to_deliver to the separate select_race_action * extracted latest_confirmed_nonce_at_source method * return Option<RaceAction> from select_race_action * make required_source_header_at_target async * remove extra argument from required_source_header_at_target * small fixes in tests * Revert "return Option<RaceAction> from select_race_action" This reverts commit 9f13dbfae39a5a45564550e8c89b10a524a68729. * implement required_source_header_at_target using what-if approach * fix compilation * fmt * clippy * moved some code to the can_submit_transaction_with
This commit is contained in:
committed by
Bastian Köcher
parent
25220c2ca4
commit
8b262ea60b
@@ -55,7 +55,7 @@ pub trait Chain: ChainBase + Clone {
|
||||
/// Block type.
|
||||
type SignedBlock: Member + Serialize + DeserializeOwned + BlockWithJustification<Self::Header>;
|
||||
/// The aggregated `Call` type.
|
||||
type Call: Clone + Codec + Debug + Send;
|
||||
type Call: Clone + Codec + Debug + Send + Sync;
|
||||
}
|
||||
|
||||
/// Substrate-based relay chain that supports parachains.
|
||||
|
||||
@@ -91,7 +91,7 @@ impl<AccountId> TaggedAccount<AccountId> {
|
||||
}
|
||||
|
||||
/// Batch call builder.
|
||||
pub trait BatchCallBuilder<Call>: Clone + Send {
|
||||
pub trait BatchCallBuilder<Call>: Clone + Send + Sync {
|
||||
/// Create batch call from given calls vector.
|
||||
fn build_batch_call(&self, _calls: Vec<Call>) -> Call;
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
|
||||
[dependencies]
|
||||
async-std = { version = "1.6.5", features = ["attributes"] }
|
||||
async-trait = "0.1"
|
||||
env_logger = "0.10"
|
||||
futures = "0.3.28"
|
||||
hex = "0.4"
|
||||
log = "0.4.17"
|
||||
|
||||
@@ -111,7 +111,7 @@ pub struct NoncesSubmitArtifacts<T> {
|
||||
|
||||
/// Batch transaction that already submit some headers and needs to be extended with
|
||||
/// messages/delivery proof before sending.
|
||||
pub trait BatchTransaction<HeaderId>: Debug + Send {
|
||||
pub trait BatchTransaction<HeaderId>: Debug + Send + Sync {
|
||||
/// Header that was required in the original call and which is bundled within this
|
||||
/// batch transaction.
|
||||
fn required_header_id(&self) -> HeaderId;
|
||||
@@ -622,11 +622,19 @@ pub(crate) mod tests {
|
||||
}
|
||||
|
||||
impl TestClientData {
|
||||
fn receive_messages(&mut self, proof: TestMessagesProof) {
|
||||
fn receive_messages(
|
||||
&mut self,
|
||||
maybe_batch_tx: Option<TestMessagesBatchTransaction>,
|
||||
proof: TestMessagesProof,
|
||||
) {
|
||||
self.target_state.best_self =
|
||||
HeaderId(self.target_state.best_self.0 + 1, self.target_state.best_self.1 + 1);
|
||||
self.target_state.best_finalized_self = self.target_state.best_self;
|
||||
self.target_latest_received_nonce = *proof.0.end();
|
||||
if let Some(maybe_batch_tx) = maybe_batch_tx {
|
||||
self.target_state.best_finalized_peer_at_best_self =
|
||||
Some(maybe_batch_tx.required_header_id());
|
||||
}
|
||||
if let Some(target_latest_confirmed_received_nonce) = proof.1 {
|
||||
self.target_latest_confirmed_received_nonce =
|
||||
target_latest_confirmed_received_nonce;
|
||||
@@ -634,10 +642,18 @@ pub(crate) mod tests {
|
||||
self.submitted_messages_proofs.push(proof);
|
||||
}
|
||||
|
||||
fn receive_messages_delivery_proof(&mut self, proof: TestMessagesReceivingProof) {
|
||||
fn receive_messages_delivery_proof(
|
||||
&mut self,
|
||||
maybe_batch_tx: Option<TestConfirmationBatchTransaction>,
|
||||
proof: TestMessagesReceivingProof,
|
||||
) {
|
||||
self.source_state.best_self =
|
||||
HeaderId(self.source_state.best_self.0 + 1, self.source_state.best_self.1 + 1);
|
||||
self.source_state.best_finalized_self = self.source_state.best_self;
|
||||
if let Some(maybe_batch_tx) = maybe_batch_tx {
|
||||
self.source_state.best_finalized_peer_at_best_self =
|
||||
Some(maybe_batch_tx.required_header_id());
|
||||
}
|
||||
self.submitted_messages_receiving_proofs.push(proof);
|
||||
self.source_latest_confirmed_received_nonce = proof;
|
||||
}
|
||||
@@ -760,13 +776,13 @@ pub(crate) mod tests {
|
||||
|
||||
async fn submit_messages_receiving_proof(
|
||||
&self,
|
||||
_maybe_batch_tx: Option<Self::BatchTransaction>,
|
||||
maybe_batch_tx: Option<Self::BatchTransaction>,
|
||||
_generated_at_block: TargetHeaderIdOf<TestMessageLane>,
|
||||
proof: TestMessagesReceivingProof,
|
||||
) -> Result<Self::TransactionTracker, TestError> {
|
||||
let mut data = self.data.lock();
|
||||
(self.tick)(&mut data);
|
||||
data.receive_messages_delivery_proof(proof);
|
||||
data.receive_messages_delivery_proof(maybe_batch_tx, proof);
|
||||
(self.post_tick)(&mut data);
|
||||
Ok(TestTransactionTracker(data.source_tracked_transaction_status))
|
||||
}
|
||||
@@ -885,7 +901,7 @@ pub(crate) mod tests {
|
||||
|
||||
async fn submit_messages_proof(
|
||||
&self,
|
||||
_maybe_batch_tx: Option<Self::BatchTransaction>,
|
||||
maybe_batch_tx: Option<Self::BatchTransaction>,
|
||||
_generated_at_header: SourceHeaderIdOf<TestMessageLane>,
|
||||
nonces: RangeInclusive<MessageNonce>,
|
||||
proof: TestMessagesProof,
|
||||
@@ -895,7 +911,7 @@ pub(crate) mod tests {
|
||||
if data.is_target_fails {
|
||||
return Err(TestError)
|
||||
}
|
||||
data.receive_messages(proof);
|
||||
data.receive_messages(maybe_batch_tx, proof);
|
||||
(self.post_tick)(&mut data);
|
||||
Ok(NoncesSubmitArtifacts {
|
||||
nonces,
|
||||
|
||||
@@ -290,7 +290,185 @@ impl<P: MessageLane, SC, TC> std::fmt::Debug for MessageDeliveryStrategy<P, SC,
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC> {
|
||||
impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC>
|
||||
where
|
||||
P: MessageLane,
|
||||
SC: MessageLaneSourceClient<P>,
|
||||
TC: MessageLaneTargetClient<P>,
|
||||
{
|
||||
/// Returns true if some race action can be selected (with `select_race_action`) at given
|
||||
/// `best_finalized_source_header_id_at_best_target` source header at target.
|
||||
async fn can_submit_transaction_with<
|
||||
RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>,
|
||||
>(
|
||||
&self,
|
||||
mut race_state: RS,
|
||||
maybe_best_finalized_source_header_id_at_best_target: Option<SourceHeaderIdOf<P>>,
|
||||
) -> bool {
|
||||
if let Some(best_finalized_source_header_id_at_best_target) =
|
||||
maybe_best_finalized_source_header_id_at_best_target
|
||||
{
|
||||
race_state.set_best_finalized_source_header_id_at_best_target(
|
||||
best_finalized_source_header_id_at_best_target,
|
||||
);
|
||||
|
||||
return self.select_race_action(race_state).await.is_some()
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
async fn select_race_action<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
|
||||
&self,
|
||||
race_state: RS,
|
||||
) -> Option<(RangeInclusive<MessageNonce>, MessageProofParameters)> {
|
||||
let best_target_nonce = self.strategy.best_at_target()?;
|
||||
let best_finalized_source_header_id_at_best_target =
|
||||
race_state.best_finalized_source_header_id_at_best_target()?;
|
||||
let latest_confirmed_nonce_at_source = self
|
||||
.latest_confirmed_nonce_at_source(&best_finalized_source_header_id_at_best_target)
|
||||
.unwrap_or(best_target_nonce);
|
||||
let target_nonces = self.target_nonces.as_ref()?;
|
||||
|
||||
// There's additional condition in the message delivery race: target would reject messages
|
||||
// if there are too much unconfirmed messages at the inbound lane.
|
||||
|
||||
// The receiving race is responsible to deliver confirmations back to the source chain. So
|
||||
// if there's a lot of unconfirmed messages, let's wait until it'll be able to do its job.
|
||||
let latest_received_nonce_at_target = target_nonces.latest_nonce;
|
||||
let confirmations_missing =
|
||||
latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source);
|
||||
match confirmations_missing {
|
||||
Some(confirmations_missing)
|
||||
if confirmations_missing >= self.max_unconfirmed_nonces_at_target =>
|
||||
{
|
||||
log::debug!(
|
||||
target: "bridge",
|
||||
"Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \
|
||||
at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}",
|
||||
MessageDeliveryRace::<P>::source_name(),
|
||||
MessageDeliveryRace::<P>::target_name(),
|
||||
latest_received_nonce_at_target,
|
||||
latest_confirmed_nonce_at_source,
|
||||
self.max_unconfirmed_nonces_at_target,
|
||||
);
|
||||
|
||||
return None
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
|
||||
// Ok - we may have new nonces to deliver. But target may still reject new messages, because
|
||||
// we haven't notified it that (some) messages have been confirmed. So we may want to
|
||||
// include updated `source.latest_confirmed` in the proof.
|
||||
//
|
||||
// Important note: we're including outbound state lane proof whenever there are unconfirmed
|
||||
// nonces on the target chain. Other strategy is to include it only if it's absolutely
|
||||
// necessary.
|
||||
let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce;
|
||||
let outbound_state_proof_required =
|
||||
latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source;
|
||||
|
||||
// The target node would also reject messages if there are too many entries in the
|
||||
// "unrewarded relayers" set. If we are unable to prove new rewards to the target node, then
|
||||
// we should wait for confirmations race.
|
||||
let unrewarded_limit_reached =
|
||||
target_nonces.nonces_data.unrewarded_relayers.unrewarded_relayer_entries >=
|
||||
self.max_unrewarded_relayer_entries_at_target ||
|
||||
target_nonces.nonces_data.unrewarded_relayers.total_messages >=
|
||||
self.max_unconfirmed_nonces_at_target;
|
||||
if unrewarded_limit_reached {
|
||||
// so there are already too many unrewarded relayer entries in the set
|
||||
//
|
||||
// => check if we can prove enough rewards. If not, we should wait for more rewards to
|
||||
// be paid
|
||||
let number_of_rewards_being_proved =
|
||||
latest_confirmed_nonce_at_source.saturating_sub(latest_confirmed_nonce_at_target);
|
||||
let enough_rewards_being_proved = number_of_rewards_being_proved >=
|
||||
target_nonces.nonces_data.unrewarded_relayers.messages_in_oldest_entry;
|
||||
if !enough_rewards_being_proved {
|
||||
return None
|
||||
}
|
||||
}
|
||||
|
||||
// If we're here, then the confirmations race did its job && sending side now knows that
|
||||
// messages have been delivered. Now let's select nonces that we want to deliver.
|
||||
//
|
||||
// We may deliver at most:
|
||||
//
|
||||
// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
|
||||
// latest_confirmed_nonce_at_target)
|
||||
//
|
||||
// messages in the batch. But since we're including outbound state proof in the batch, then
|
||||
// it may be increased to:
|
||||
//
|
||||
// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
|
||||
// latest_confirmed_nonce_at_source)
|
||||
let future_confirmed_nonce_at_target = if outbound_state_proof_required {
|
||||
latest_confirmed_nonce_at_source
|
||||
} else {
|
||||
latest_confirmed_nonce_at_target
|
||||
};
|
||||
let max_nonces = latest_received_nonce_at_target
|
||||
.checked_sub(future_confirmed_nonce_at_target)
|
||||
.and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff))
|
||||
.unwrap_or_default();
|
||||
let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch);
|
||||
let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch;
|
||||
let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch;
|
||||
let lane_source_client = self.lane_source_client.clone();
|
||||
let lane_target_client = self.lane_target_client.clone();
|
||||
|
||||
// select nonces from nonces, available for delivery
|
||||
let selected_nonces = match self.strategy.available_source_queue_indices(race_state) {
|
||||
Some(available_source_queue_indices) => {
|
||||
let source_queue = self.strategy.source_queue();
|
||||
let reference = RelayMessagesBatchReference {
|
||||
max_messages_in_this_batch: max_nonces,
|
||||
max_messages_weight_in_single_batch,
|
||||
max_messages_size_in_single_batch,
|
||||
lane_source_client: lane_source_client.clone(),
|
||||
lane_target_client: lane_target_client.clone(),
|
||||
best_target_nonce,
|
||||
nonces_queue: source_queue.clone(),
|
||||
nonces_queue_range: available_source_queue_indices,
|
||||
metrics: self.metrics_msg.clone(),
|
||||
};
|
||||
|
||||
MessageRaceLimits::decide(reference).await
|
||||
},
|
||||
None => {
|
||||
// we still may need to submit delivery transaction with zero messages to
|
||||
// unblock the lane. But it'll only be accepted if the lane is blocked
|
||||
// (i.e. when `unrewarded_limit_reached` is `true`)
|
||||
None
|
||||
},
|
||||
};
|
||||
|
||||
// check if we need unblocking transaction and we may submit it
|
||||
#[allow(clippy::reversed_empty_ranges)]
|
||||
let selected_nonces = match selected_nonces {
|
||||
Some(selected_nonces) => selected_nonces,
|
||||
None if unrewarded_limit_reached && outbound_state_proof_required => 1..=0,
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);
|
||||
Some((
|
||||
selected_nonces,
|
||||
MessageProofParameters { outbound_state_proof_required, dispatch_weight },
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns lastest confirmed message at source chain, given source block.
|
||||
fn latest_confirmed_nonce_at_source(&self, at: &SourceHeaderIdOf<P>) -> Option<MessageNonce> {
|
||||
self.latest_confirmed_nonces_at_source
|
||||
.iter()
|
||||
.take_while(|(id, _)| id.0 <= at.0)
|
||||
.last()
|
||||
.map(|(_, nonce)| *nonce)
|
||||
}
|
||||
|
||||
/// Returns total weight of all undelivered messages.
|
||||
fn dispatch_weight_for_range(&self, range: &RangeInclusive<MessageNonce>) -> Weight {
|
||||
self.strategy
|
||||
@@ -322,9 +500,10 @@ where
|
||||
self.strategy.is_empty()
|
||||
}
|
||||
|
||||
fn required_source_header_at_target<RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>>(
|
||||
async fn required_source_header_at_target<
|
||||
RS: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>>,
|
||||
>(
|
||||
&self,
|
||||
current_best: &SourceHeaderIdOf<P>,
|
||||
race_state: RS,
|
||||
) -> Option<SourceHeaderIdOf<P>> {
|
||||
// we have already submitted something - let's wait until it is mined
|
||||
@@ -332,32 +511,41 @@ where
|
||||
return None
|
||||
}
|
||||
|
||||
let has_nonces_to_deliver = !self.strategy.is_empty();
|
||||
let header_required_for_messages_delivery =
|
||||
self.strategy.required_source_header_at_target(current_best, race_state);
|
||||
let header_required_for_reward_confirmations_delivery = self
|
||||
.latest_confirmed_nonces_at_source
|
||||
.back()
|
||||
.filter(|(id, nonce)| *nonce != 0 && id.0 > current_best.0)
|
||||
.map(|(id, _)| id.clone());
|
||||
match (
|
||||
has_nonces_to_deliver,
|
||||
header_required_for_messages_delivery,
|
||||
header_required_for_reward_confirmations_delivery,
|
||||
) {
|
||||
// if we need to delver messages and proof-of-delivery-confirmations, then we need to
|
||||
// select the most recent header to avoid extra roundtrips
|
||||
(true, Some(id1), Some(id2)) => Some(if id1.0 > id2.0 { id1 } else { id2 }),
|
||||
// if we only need to deliver messages - fine, let's require some source header
|
||||
//
|
||||
// if we need new header for proof-of-delivery-confirmations - let's also ask for that.
|
||||
// Even though it may require additional header, we'll be sure that we won't block the
|
||||
// lane (sometimes we can't deliver messages without proof-of-delivery-confirmations)
|
||||
(true, a, b) => a.or(b),
|
||||
// we never submit delivery transaction without messages, so if `has_nonces_to_deliver`
|
||||
// if `false`, we don't need any source headers at target
|
||||
(false, _, _) => None,
|
||||
// if we can deliver something using current race state, go on
|
||||
let selected_nonces = self.select_race_action(race_state.clone()).await;
|
||||
if selected_nonces.is_some() {
|
||||
return None
|
||||
}
|
||||
|
||||
// check if we may deliver some messages if we'll relay require source header
|
||||
// to target first
|
||||
let maybe_source_header_for_delivery =
|
||||
self.strategy.source_queue().back().map(|(id, _)| id.clone());
|
||||
if self
|
||||
.can_submit_transaction_with(
|
||||
race_state.clone(),
|
||||
maybe_source_header_for_delivery.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
return maybe_source_header_for_delivery
|
||||
}
|
||||
|
||||
// ok, we can't delivery anything even if we relay some source blocks first. But maybe
|
||||
// the lane is blocked and we need to submit unblock transaction?
|
||||
let maybe_source_header_for_reward_confirmation =
|
||||
self.latest_confirmed_nonces_at_source.back().map(|(id, _)| id.clone());
|
||||
if self
|
||||
.can_submit_transaction_with(
|
||||
race_state.clone(),
|
||||
maybe_source_header_for_reward_confirmation.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
return maybe_source_header_for_reward_confirmation
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn best_at_source(&self) -> Option<MessageNonce> {
|
||||
@@ -436,128 +624,7 @@ where
|
||||
&self,
|
||||
race_state: RS,
|
||||
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
|
||||
let best_target_nonce = self.strategy.best_at_target()?;
|
||||
let best_finalized_source_header_id_at_best_target =
|
||||
race_state.best_finalized_source_header_id_at_best_target()?;
|
||||
let latest_confirmed_nonce_at_source = self
|
||||
.latest_confirmed_nonces_at_source
|
||||
.iter()
|
||||
.take_while(|(id, _)| id.0 <= best_finalized_source_header_id_at_best_target.0)
|
||||
.last()
|
||||
.map(|(_, nonce)| *nonce)
|
||||
.unwrap_or(best_target_nonce);
|
||||
let target_nonces = self.target_nonces.as_ref()?;
|
||||
|
||||
// There's additional condition in the message delivery race: target would reject messages
|
||||
// if there are too much unconfirmed messages at the inbound lane.
|
||||
|
||||
// The receiving race is responsible to deliver confirmations back to the source chain. So
|
||||
// if there's a lot of unconfirmed messages, let's wait until it'll be able to do its job.
|
||||
let latest_received_nonce_at_target = target_nonces.latest_nonce;
|
||||
let confirmations_missing =
|
||||
latest_received_nonce_at_target.checked_sub(latest_confirmed_nonce_at_source);
|
||||
match confirmations_missing {
|
||||
Some(confirmations_missing)
|
||||
if confirmations_missing >= self.max_unconfirmed_nonces_at_target =>
|
||||
{
|
||||
log::debug!(
|
||||
target: "bridge",
|
||||
"Cannot deliver any more messages from {} to {}. Too many unconfirmed nonces \
|
||||
at target: target.latest_received={:?}, source.latest_confirmed={:?}, max={:?}",
|
||||
MessageDeliveryRace::<P>::source_name(),
|
||||
MessageDeliveryRace::<P>::target_name(),
|
||||
latest_received_nonce_at_target,
|
||||
latest_confirmed_nonce_at_source,
|
||||
self.max_unconfirmed_nonces_at_target,
|
||||
);
|
||||
|
||||
return None
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
|
||||
// Ok - we may have new nonces to deliver. But target may still reject new messages, because
|
||||
// we haven't notified it that (some) messages have been confirmed. So we may want to
|
||||
// include updated `source.latest_confirmed` in the proof.
|
||||
//
|
||||
// Important note: we're including outbound state lane proof whenever there are unconfirmed
|
||||
// nonces on the target chain. Other strategy is to include it only if it's absolutely
|
||||
// necessary.
|
||||
let latest_confirmed_nonce_at_target = target_nonces.nonces_data.confirmed_nonce;
|
||||
let outbound_state_proof_required =
|
||||
latest_confirmed_nonce_at_target < latest_confirmed_nonce_at_source;
|
||||
|
||||
// The target node would also reject messages if there are too many entries in the
|
||||
// "unrewarded relayers" set. If we are unable to prove new rewards to the target node, then
|
||||
// we should wait for confirmations race.
|
||||
let unrewarded_relayer_entries_limit_reached =
|
||||
target_nonces.nonces_data.unrewarded_relayers.unrewarded_relayer_entries >=
|
||||
self.max_unrewarded_relayer_entries_at_target;
|
||||
if unrewarded_relayer_entries_limit_reached {
|
||||
// so there are already too many unrewarded relayer entries in the set
|
||||
//
|
||||
// => check if we can prove enough rewards. If not, we should wait for more rewards to
|
||||
// be paid
|
||||
let number_of_rewards_being_proved =
|
||||
latest_confirmed_nonce_at_source.saturating_sub(latest_confirmed_nonce_at_target);
|
||||
let enough_rewards_being_proved = number_of_rewards_being_proved >=
|
||||
target_nonces.nonces_data.unrewarded_relayers.messages_in_oldest_entry;
|
||||
if !enough_rewards_being_proved {
|
||||
return None
|
||||
}
|
||||
}
|
||||
|
||||
// If we're here, then the confirmations race did its job && sending side now knows that
|
||||
// messages have been delivered. Now let's select nonces that we want to deliver.
|
||||
//
|
||||
// We may deliver at most:
|
||||
//
|
||||
// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
|
||||
// latest_confirmed_nonce_at_target)
|
||||
//
|
||||
// messages in the batch. But since we're including outbound state proof in the batch, then
|
||||
// it may be increased to:
|
||||
//
|
||||
// max_unconfirmed_nonces_at_target - (latest_received_nonce_at_target -
|
||||
// latest_confirmed_nonce_at_source)
|
||||
let future_confirmed_nonce_at_target = if outbound_state_proof_required {
|
||||
latest_confirmed_nonce_at_source
|
||||
} else {
|
||||
latest_confirmed_nonce_at_target
|
||||
};
|
||||
let max_nonces = latest_received_nonce_at_target
|
||||
.checked_sub(future_confirmed_nonce_at_target)
|
||||
.and_then(|diff| self.max_unconfirmed_nonces_at_target.checked_sub(diff))
|
||||
.unwrap_or_default();
|
||||
let max_nonces = std::cmp::min(max_nonces, self.max_messages_in_single_batch);
|
||||
let max_messages_weight_in_single_batch = self.max_messages_weight_in_single_batch;
|
||||
let max_messages_size_in_single_batch = self.max_messages_size_in_single_batch;
|
||||
let lane_source_client = self.lane_source_client.clone();
|
||||
let lane_target_client = self.lane_target_client.clone();
|
||||
|
||||
let available_source_queue_indices =
|
||||
self.strategy.available_source_queue_indices(race_state)?;
|
||||
let source_queue = self.strategy.source_queue();
|
||||
|
||||
let reference = RelayMessagesBatchReference {
|
||||
max_messages_in_this_batch: max_nonces,
|
||||
max_messages_weight_in_single_batch,
|
||||
max_messages_size_in_single_batch,
|
||||
lane_source_client: lane_source_client.clone(),
|
||||
lane_target_client: lane_target_client.clone(),
|
||||
best_target_nonce,
|
||||
nonces_queue: source_queue.clone(),
|
||||
nonces_queue_range: available_source_queue_indices,
|
||||
metrics: self.metrics_msg.clone(),
|
||||
};
|
||||
|
||||
let selected_nonces = MessageRaceLimits::decide(reference).await?;
|
||||
let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);
|
||||
|
||||
Some((
|
||||
selected_nonces,
|
||||
MessageProofParameters { outbound_state_proof_required, dispatch_weight },
|
||||
))
|
||||
self.select_race_action(race_state).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -980,31 +1047,41 @@ mod tests {
|
||||
);
|
||||
// nothing needs to be delivered now and we don't need any new headers
|
||||
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
|
||||
assert_eq!(strategy.required_source_header_at_target(&header_id(1), state.clone()), None);
|
||||
assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
|
||||
|
||||
// now let's generate two more nonces [24; 25] at the soruce;
|
||||
strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0));
|
||||
//
|
||||
// - so now we'll need to relay source block#2 to be able to accept messages [24; 25].
|
||||
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
|
||||
assert_eq!(
|
||||
strategy.required_source_header_at_target(&header_id(1), state.clone()),
|
||||
Some(header_id(2))
|
||||
);
|
||||
|
||||
// let's relay source block#2
|
||||
// block#2 is generated
|
||||
state.best_finalized_source_header_id_at_source = Some(header_id(2));
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
|
||||
state.best_target_header_id = Some(header_id(2));
|
||||
state.best_finalized_target_header_id = Some(header_id(2));
|
||||
|
||||
// now let's generate two more nonces [24; 25] at the source;
|
||||
strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 19, 0));
|
||||
//
|
||||
// we don't need to relay more headers to target, because messages [20; 23] have
|
||||
// not confirmed to source yet
|
||||
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
|
||||
assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
|
||||
|
||||
// let's relay source block#3
|
||||
state.best_finalized_source_header_id_at_source = Some(header_id(3));
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(3));
|
||||
state.best_target_header_id = Some(header_id(3));
|
||||
state.best_finalized_target_header_id = Some(header_id(3));
|
||||
|
||||
// and ask strategy again => still nothing to deliver, because parallel confirmations
|
||||
// race need to be pushed further
|
||||
assert_eq!(strategy.select_nonces_to_deliver(state.clone()).await, None);
|
||||
assert_eq!(strategy.required_source_header_at_target(&header_id(2), state.clone()), None);
|
||||
assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
|
||||
|
||||
// let's relay source block#3
|
||||
state.best_finalized_source_header_id_at_source = Some(header_id(4));
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
|
||||
state.best_target_header_id = Some(header_id(4));
|
||||
state.best_finalized_target_header_id = Some(header_id(4));
|
||||
|
||||
// let's confirm messages [20; 23]
|
||||
strategy.source_nonces_updated(header_id(2), source_nonces(24..=25, 23, 0));
|
||||
strategy.source_nonces_updated(header_id(4), source_nonces(24..=25, 23, 0));
|
||||
|
||||
// and ask strategy again => now we have everything required to deliver remaining
|
||||
// [24; 25] nonces and proof of [20; 23] confirmation
|
||||
@@ -1012,7 +1089,7 @@ mod tests {
|
||||
strategy.select_nonces_to_deliver(state.clone()).await,
|
||||
Some(((24..=25), proof_parameters(true, 2))),
|
||||
);
|
||||
assert_eq!(strategy.required_source_header_at_target(&header_id(2), state), None);
|
||||
assert_eq!(strategy.required_source_header_at_target(state).await, None);
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
@@ -1041,9 +1118,9 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[async_std::test]
|
||||
#[allow(clippy::reversed_empty_ranges)]
|
||||
fn no_source_headers_required_at_target_if_lanes_are_empty() {
|
||||
async fn no_source_headers_required_at_target_if_lanes_are_empty() {
|
||||
let (state, _) = prepare_strategy();
|
||||
let mut strategy = TestStrategy {
|
||||
max_unrewarded_relayer_entries_at_target: 4,
|
||||
@@ -1073,7 +1150,7 @@ mod tests {
|
||||
strategy.latest_confirmed_nonces_at_source,
|
||||
VecDeque::from([(source_header_id, 0)])
|
||||
);
|
||||
assert_eq!(strategy.required_source_header_at_target(&source_header_id, state), None);
|
||||
assert_eq!(strategy.required_source_header_at_target(state).await, None);
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
@@ -1159,4 +1236,138 @@ mod tests {
|
||||
)),
|
||||
);
|
||||
}
|
||||
|
||||
#[async_std::test]
|
||||
#[allow(clippy::reversed_empty_ranges)]
|
||||
async fn delivery_race_is_able_to_unblock_lane() {
|
||||
// step 1: messages 20..=23 are delivered from source to target at target block 2
|
||||
fn at_target_block_2_deliver_messages(
|
||||
strategy: &mut TestStrategy,
|
||||
state: &mut TestRaceState,
|
||||
occupied_relayer_slots: MessageNonce,
|
||||
occupied_message_slots: MessageNonce,
|
||||
) {
|
||||
let nonces_at_target = TargetClientNonces {
|
||||
latest_nonce: 23,
|
||||
nonces_data: DeliveryRaceTargetNoncesData {
|
||||
confirmed_nonce: 19,
|
||||
unrewarded_relayers: UnrewardedRelayersState {
|
||||
unrewarded_relayer_entries: occupied_relayer_slots,
|
||||
total_messages: occupied_message_slots,
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
state.best_target_header_id = Some(header_id(2));
|
||||
state.best_finalized_target_header_id = Some(header_id(2));
|
||||
|
||||
strategy.best_target_nonces_updated(nonces_at_target.clone(), state);
|
||||
strategy.finalized_target_nonces_updated(nonces_at_target, state);
|
||||
}
|
||||
|
||||
// step 2: delivery of messages 20..=23 is confirmed to the source node at source block 2
|
||||
fn at_source_block_2_deliver_confirmations(
|
||||
strategy: &mut TestStrategy,
|
||||
state: &mut TestRaceState,
|
||||
) {
|
||||
state.best_finalized_source_header_id_at_source = Some(header_id(2));
|
||||
|
||||
strategy.source_nonces_updated(
|
||||
header_id(2),
|
||||
SourceClientNonces { new_nonces: Default::default(), confirmed_nonce: Some(23) },
|
||||
);
|
||||
}
|
||||
|
||||
// step 3: finalize source block 2 at target block 3 and select nonces to deliver
|
||||
async fn at_target_block_3_select_nonces_to_deliver(
|
||||
strategy: &TestStrategy,
|
||||
mut state: TestRaceState,
|
||||
) -> Option<(RangeInclusive<MessageNonce>, MessageProofParameters)> {
|
||||
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
|
||||
state.best_target_header_id = Some(header_id(3));
|
||||
state.best_finalized_target_header_id = Some(header_id(3));
|
||||
|
||||
strategy.select_nonces_to_deliver(state).await
|
||||
}
|
||||
|
||||
let max_unrewarded_relayer_entries_at_target = 4;
|
||||
let max_unconfirmed_nonces_at_target = 4;
|
||||
let expected_rewards_proof = Some((
|
||||
1..=0,
|
||||
MessageProofParameters {
|
||||
outbound_state_proof_required: true,
|
||||
dispatch_weight: Weight::zero(),
|
||||
},
|
||||
));
|
||||
|
||||
// TODO: also fix + test `required_source_header_at_target`
|
||||
|
||||
// when lane is NOT blocked
|
||||
let (mut state, mut strategy) = prepare_strategy();
|
||||
at_target_block_2_deliver_messages(
|
||||
&mut strategy,
|
||||
&mut state,
|
||||
max_unrewarded_relayer_entries_at_target - 1,
|
||||
max_unconfirmed_nonces_at_target - 1,
|
||||
);
|
||||
at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
|
||||
assert_eq!(strategy.required_source_header_at_target(state.clone()).await, None);
|
||||
assert_eq!(at_target_block_3_select_nonces_to_deliver(&strategy, state).await, None);
|
||||
|
||||
// when lane is blocked by no-relayer-slots in unrewarded relayers vector
|
||||
let (mut state, mut strategy) = prepare_strategy();
|
||||
at_target_block_2_deliver_messages(
|
||||
&mut strategy,
|
||||
&mut state,
|
||||
max_unrewarded_relayer_entries_at_target,
|
||||
max_unconfirmed_nonces_at_target - 1,
|
||||
);
|
||||
at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
|
||||
assert_eq!(
|
||||
strategy.required_source_header_at_target(state.clone()).await,
|
||||
Some(header_id(2))
|
||||
);
|
||||
assert_eq!(
|
||||
at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
|
||||
expected_rewards_proof
|
||||
);
|
||||
|
||||
// when lane is blocked by no-message-slots in unrewarded relayers vector
|
||||
let (mut state, mut strategy) = prepare_strategy();
|
||||
at_target_block_2_deliver_messages(
|
||||
&mut strategy,
|
||||
&mut state,
|
||||
max_unrewarded_relayer_entries_at_target - 1,
|
||||
max_unconfirmed_nonces_at_target,
|
||||
);
|
||||
at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
|
||||
assert_eq!(
|
||||
strategy.required_source_header_at_target(state.clone()).await,
|
||||
Some(header_id(2))
|
||||
);
|
||||
assert_eq!(
|
||||
at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
|
||||
expected_rewards_proof
|
||||
);
|
||||
|
||||
// when lane is blocked by no-message-slots and no-message-slots in unrewarded relayers
|
||||
// vector
|
||||
let (mut state, mut strategy) = prepare_strategy();
|
||||
at_target_block_2_deliver_messages(
|
||||
&mut strategy,
|
||||
&mut state,
|
||||
max_unrewarded_relayer_entries_at_target - 1,
|
||||
max_unconfirmed_nonces_at_target,
|
||||
);
|
||||
at_source_block_2_deliver_confirmations(&mut strategy, &mut state);
|
||||
assert_eq!(
|
||||
strategy.required_source_header_at_target(state.clone()).await,
|
||||
Some(header_id(2))
|
||||
);
|
||||
assert_eq!(
|
||||
at_target_block_3_select_nonces_to_deliver(&strategy, state).await,
|
||||
expected_rewards_proof
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,14 +41,14 @@ use std::{
|
||||
/// One of races within lane.
|
||||
pub trait MessageRace {
|
||||
/// Header id of the race source.
|
||||
type SourceHeaderId: Debug + Clone + PartialEq + Send;
|
||||
type SourceHeaderId: Debug + Clone + PartialEq + Send + Sync;
|
||||
/// Header id of the race source.
|
||||
type TargetHeaderId: Debug + Clone + PartialEq + Send;
|
||||
type TargetHeaderId: Debug + Clone + PartialEq + Send + Sync;
|
||||
|
||||
/// Message nonce used in the race.
|
||||
type MessageNonce: Debug + Clone;
|
||||
/// Proof that is generated and delivered in this race.
|
||||
type Proof: Debug + Clone + Send;
|
||||
type Proof: Debug + Clone + Send + Sync;
|
||||
|
||||
/// Name of the race source.
|
||||
fn source_name() -> String;
|
||||
@@ -175,9 +175,8 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
|
||||
/// Should return true if nothing has to be synced.
|
||||
fn is_empty(&self) -> bool;
|
||||
/// Return id of source header that is required to be on target to continue synchronization.
|
||||
fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
|
||||
async fn required_source_header_at_target<RS: RaceState<SourceHeaderId, TargetHeaderId>>(
|
||||
&self,
|
||||
current_best: &SourceHeaderId,
|
||||
race_state: RS,
|
||||
) -> Option<SourceHeaderId>;
|
||||
/// Return the best nonce at source node.
|
||||
@@ -218,7 +217,11 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
|
||||
}
|
||||
|
||||
/// State of the race.
|
||||
pub trait RaceState<SourceHeaderId, TargetHeaderId>: Send {
|
||||
pub trait RaceState<SourceHeaderId, TargetHeaderId>: Clone + Send + Sync {
|
||||
/// Set best finalized source header id at the best block on the target
|
||||
/// client (at the `best_finalized_source_header_id_at_best_target`).
|
||||
fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId);
|
||||
|
||||
/// Best finalized source header id at the source client.
|
||||
fn best_finalized_source_header_id_at_source(&self) -> Option<SourceHeaderId>;
|
||||
/// Best finalized source header id at the best block on the target
|
||||
@@ -281,11 +284,15 @@ impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> Default
|
||||
impl<SourceHeaderId, TargetHeaderId, Proof, BatchTx> RaceState<SourceHeaderId, TargetHeaderId>
|
||||
for RaceStateImpl<SourceHeaderId, TargetHeaderId, Proof, BatchTx>
|
||||
where
|
||||
SourceHeaderId: Clone + Send,
|
||||
TargetHeaderId: Clone + Send,
|
||||
Proof: Clone + Send,
|
||||
BatchTx: Clone + Send,
|
||||
SourceHeaderId: Clone + Send + Sync,
|
||||
TargetHeaderId: Clone + Send + Sync,
|
||||
Proof: Clone + Send + Sync,
|
||||
BatchTx: Clone + Send + Sync,
|
||||
{
|
||||
fn set_best_finalized_source_header_id_at_best_target(&mut self, id: SourceHeaderId) {
|
||||
self.best_finalized_source_header_id_at_best_target = Some(id);
|
||||
}
|
||||
|
||||
fn best_finalized_source_header_id_at_source(&self) -> Option<SourceHeaderId> {
|
||||
self.best_finalized_source_header_id_at_source.clone()
|
||||
}
|
||||
@@ -430,10 +437,9 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
|
||||
).fail_if_connection_error(FailedClient::Source)?;
|
||||
|
||||
// ask for more headers if we have nonces to deliver and required headers are missing
|
||||
source_required_header = race_state
|
||||
.best_finalized_source_header_id_at_best_target
|
||||
.as_ref()
|
||||
.and_then(|best| strategy.required_source_header_at_target(best, race_state.clone()));
|
||||
source_required_header = strategy
|
||||
.required_source_header_at_target(race_state.clone())
|
||||
.await;
|
||||
},
|
||||
nonces = target_best_nonces => {
|
||||
target_best_nonces_required = false;
|
||||
|
||||
@@ -205,16 +205,16 @@ impl<
|
||||
self.source_queue.is_empty()
|
||||
}
|
||||
|
||||
fn required_source_header_at_target<
|
||||
async fn required_source_header_at_target<
|
||||
RS: RaceState<
|
||||
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
|
||||
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
|
||||
>,
|
||||
>(
|
||||
&self,
|
||||
current_best: &HeaderId<SourceHeaderHash, SourceHeaderNumber>,
|
||||
_race_state: RS,
|
||||
race_state: RS,
|
||||
) -> Option<HeaderId<SourceHeaderHash, SourceHeaderNumber>> {
|
||||
let current_best = race_state.best_finalized_source_header_id_at_best_target()?;
|
||||
self.source_queue
|
||||
.back()
|
||||
.and_then(|(h, _)| if h.0 > current_best.0 { Some(h.clone()) } else { None })
|
||||
|
||||
Reference in New Issue
Block a user