Use plain source_queue view when selecting nonces for delivery (#1010)

* use plain source_queue view when selecting nonces for delivery

* Revert "use plain source_queue view when selecting nonces for delivery"

This reverts commit f1fdc3fa9a0934271b125328e69e4f16014780d1.

* Revert "Revert "use plain source_queue view when selecting nonces for delivery""

This reverts commit ccefa5ece24c675649251ebc5c686ef972eed2f4.

* clippy

* fmt
This commit is contained in:
Svyatoslav Nikolsky
2021-06-21 09:16:06 +03:00
committed by Bastian Köcher
parent 20b7f341e1
commit 87c48f649b
2 changed files with 154 additions and 122 deletions
@@ -23,7 +23,7 @@ use crate::message_race_loop::{
MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces, TargetClient,
TargetClientNonces,
};
use crate::message_race_strategy::BasicStrategy;
use crate::message_race_strategy::{BasicStrategy, SourceRangesQueue};
use crate::metrics::MessageLaneLoopMetrics;
use async_trait::async_trait;
@@ -31,7 +31,12 @@ use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
use futures::stream::FusedStream;
use num_traits::{SaturatingAdd, Zero};
use relay_utils::FailedClient;
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration};
use std::{
collections::VecDeque,
marker::PhantomData,
ops::{Range, RangeInclusive},
time::Duration,
};
/// Run message delivery race.
pub async fn run<P: MessageLane>(
@@ -499,22 +504,25 @@ where
let lane_source_client = self.lane_source_client.clone();
let lane_target_client = self.lane_target_client.clone();
let maximal_source_queue_index = self.strategy.maximal_available_source_queue_index(race_state)?;
let previous_total_dispatch_weight = self.total_queued_dispatch_weight();
let selected_nonces = self
.strategy
.select_nonces_to_deliver_with_selector(race_state.clone(), |range| async {
select_nonces_for_delivery_transaction(
let source_queue = self.strategy.source_queue();
let range_end = select_nonces_for_delivery_transaction(
relayer_mode,
max_nonces,
max_messages_weight_in_single_batch,
max_messages_size_in_single_batch,
lane_source_client.clone(),
lane_target_client.clone(),
range,
source_queue,
0..maximal_source_queue_index + 1,
)
.await
})
.await?;
let range_begin = source_queue[0].1.begin();
let selected_nonces = range_begin..=range_end;
self.strategy.remove_le_nonces_from_source_queue(range_end);
let new_total_dispatch_weight = self.total_queued_dispatch_weight();
let dispatch_weight = previous_total_dispatch_weight - new_total_dispatch_weight;
@@ -533,6 +541,7 @@ where
///
/// The function returns nonces that are NOT selected for current batch and will be
/// delivered later.
#[allow(clippy::too_many_arguments)]
async fn select_nonces_for_delivery_transaction<P: MessageLane>(
relayer_mode: RelayerMode,
max_messages_in_this_batch: MessageNonce,
@@ -540,8 +549,13 @@ async fn select_nonces_for_delivery_transaction<P: MessageLane>(
max_messages_size_in_single_batch: u32,
lane_source_client: impl MessageLaneSourceClient<P>,
lane_target_client: impl MessageLaneTargetClient<P>,
ready_nonces: MessageDetailsMap<P::SourceChainBalance>,
) -> Option<MessageDetailsMap<P::SourceChainBalance>> {
nonces_queue: &SourceRangesQueue<
P::SourceHeaderHash,
P::SourceHeaderNumber,
MessageDetailsMap<P::SourceChainBalance>,
>,
nonces_queue_range: Range<usize>,
) -> Option<MessageNonce> {
let mut hard_selected_count = 0;
let mut soft_selected_count = 0;
@@ -563,7 +577,11 @@ async fn select_nonces_for_delivery_transaction<P: MessageLane>(
Zero::zero()
};
for (index, (nonce, details)) in ready_nonces.iter().enumerate() {
let all_ready_nonces = nonces_queue
.range(nonces_queue_range.clone())
.flat_map(|(_, ready_nonces)| ready_nonces.iter())
.enumerate();
for (index, (nonce, details)) in all_ready_nonces {
// Since we (hopefully) have some reserves in `max_messages_weight_in_single_batch`
// and `max_messages_size_in_single_batch`, we may still try to submit transaction
// with single message if message overflows these limits. The worst case would be if
@@ -671,23 +689,27 @@ async fn select_nonces_for_delivery_transaction<P: MessageLane>(
selected_count = new_selected_count;
}
let hard_selected_begin_nonce = nonces_queue[nonces_queue_range.start].1.begin();
if hard_selected_count != soft_selected_count {
let hard_selected_end_nonce = hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1;
let soft_selected_begin_nonce = hard_selected_begin_nonce;
let soft_selected_end_nonce = soft_selected_begin_nonce + soft_selected_count as MessageNonce - 1;
log::warn!(
target: "bridge",
"Relayer may deliver nonces [{:?}; {:?}], but because of its strategy ({:?}) it has selected \
nonces [{:?}; {:?}].",
ready_nonces.keys().next(),
ready_nonces.keys().next().map(|begin| begin + (hard_selected_count as MessageNonce) - 1),
hard_selected_begin_nonce,
hard_selected_end_nonce,
relayer_mode,
ready_nonces.keys().next(),
ready_nonces.keys().next().map(|begin| begin + (soft_selected_count as MessageNonce) - 1),
soft_selected_begin_nonce,
soft_selected_end_nonce,
);
hard_selected_count = soft_selected_count;
}
if hard_selected_count != ready_nonces.len() {
Some(ready_nonces.into_iter().skip(hard_selected_count).collect())
if hard_selected_count != 0 {
Some(hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1)
} else {
None
}
@@ -1127,4 +1149,30 @@ mod tests {
Some(((20..=23), proof_parameters(false, 4)))
);
}
#[async_std::test]
async fn relayer_uses_flattened_view_of_the_source_queue_to_select_nonces() {
// Real scenario that has happened on test deployments:
// 1) relayer witnessed M1 at block 1 => it has separate entry in the `source_queue`
// 2) relayer witnessed M2 at block 2 => it has separate entry in the `source_queue`
// 3) if block 2 is known to the target node, then both M1 and M2 are selected for single delivery,
// even though weight(M1+M2) > larger than largest allowed weight
//
// This was happening because selector (`select_nonces_for_delivery_transaction`) has been called
// for every `source_queue` entry separately without preserving any context.
let (mut state, mut strategy) = prepare_strategy();
let nonces = source_nonces(24..=25, 19, DEFAULT_REWARD - DELIVERY_TRANSACTION_COST);
strategy.strategy.source_nonces_updated(header_id(2), nonces);
strategy.max_unrewarded_relayer_entries_at_target = 100;
strategy.max_unconfirmed_nonces_at_target = 100;
strategy.max_messages_in_single_batch = 5;
strategy.max_messages_weight_in_single_batch = 100;
strategy.max_messages_size_in_single_batch = 100;
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
assert_eq!(
strategy.select_nonces_to_deliver(state).await,
Some(((20..=24), proof_parameters(false, 5)))
);
}
}
@@ -22,7 +22,11 @@ use crate::message_race_loop::{NoncesRange, RaceState, RaceStrategy, SourceClien
use async_trait::async_trait;
use bp_messages::MessageNonce;
use relay_utils::HeaderId;
use std::{collections::VecDeque, fmt::Debug, future::Future, marker::PhantomData, ops::RangeInclusive};
use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, ops::RangeInclusive};
/// Queue of nonces known to the source node.
pub type SourceRangesQueue<SourceHeaderHash, SourceHeaderNumber, SourceNoncesRange> =
VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>;
/// Nonces delivery strategy.
#[derive(Debug)]
@@ -35,7 +39,7 @@ pub struct BasicStrategy<
Proof,
> {
/// All queued nonces.
source_queue: VecDeque<(HeaderId<SourceHeaderHash, SourceHeaderNumber>, SourceNoncesRange)>,
source_queue: SourceRangesQueue<SourceHeaderHash, SourceHeaderNumber, SourceNoncesRange>,
/// Best nonce known to target node (at its best block). `None` if it has not been received yet.
best_target_nonce: Option<MessageNonce>,
/// Unused generic types dump.
@@ -73,25 +77,21 @@ where
&mut self.source_queue
}
/// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated
/// data) from source to target node.
/// Returns index of the latest source queue entry, that may be delivered to the target node.
///
/// The `selector` function receives range of nonces and should return `None` if the whole
/// range needs to be delivered. If there are some nonces in the range that can't be delivered
/// right now, it should return `Some` with 'undeliverable' nonces. Please keep in mind that
/// this should be the sub-range that the passed range ends with, because nonces are always
/// delivered in-order. Otherwise the function will panic.
pub async fn select_nonces_to_deliver_with_selector<F: Future<Output = Option<SourceNoncesRange>>>(
&mut self,
/// Returns `None` if no entries may be delivered. All entries before and including the `Some(_)`
/// index are guaranteed to be witnessed at source blocks that are known to be finalized at the
/// target node.
pub fn maximal_available_source_queue_index(
&self,
race_state: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Proof,
>,
selector: impl Fn(SourceNoncesRange) -> F,
) -> Option<RangeInclusive<MessageNonce>> {
) -> Option<usize> {
// if we do not know best nonce at target node, we can't select anything
let target_nonce = self.best_target_nonce?;
let _ = self.best_target_nonce?;
// if we have already selected nonces that we want to submit, do nothing
if race_state.nonces_to_submit.is_some() {
@@ -107,50 +107,29 @@ where
// 2) we can't deliver new nonce until header, that has emitted this nonce, is finalized
// by target client
// 3) selector is used for more complicated logic
let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target.clone()?;
let mut nonces_end = None;
while let Some((queued_at, queued_range)) = self.source_queue.pop_front() {
// select (sub) range to deliver
let queued_range_begin = queued_range.begin();
let queued_range_end = queued_range.end();
let range_to_requeue = if queued_at.0 > best_header_at_target.0 {
// if header that has queued the range is not yet finalized at bridged chain,
// we can't prove anything
Some(queued_range)
} else {
// selector returns `Some(range)` if this `range` needs to be requeued
selector(queued_range).await
};
// requeue (sub) range and update range to deliver
match range_to_requeue {
Some(range_to_requeue) => {
assert!(
range_to_requeue.begin() <= range_to_requeue.end()
&& range_to_requeue.begin() >= queued_range_begin
&& range_to_requeue.end() == queued_range_end,
"Incorrect implementation of internal `selector` function. Expected original\
range {:?} to end with returned range {:?}",
queued_range_begin..=queued_range_end,
range_to_requeue,
);
if range_to_requeue.begin() != queued_range_begin {
nonces_end = Some(range_to_requeue.begin() - 1);
//
// => let's first select range of entries inside deque that are already finalized at
// the target client and pass this range to the selector
let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target?;
self.source_queue
.iter()
.enumerate()
.take_while(|(_, (queued_at, _))| queued_at.0 <= best_header_at_target.0)
.map(|(index, _)| index)
.last()
}
/// Remove all nonces that are less than or equal to given nonce from the source queue.
pub fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) {
while let Some((queued_at, queued_range)) = self.source_queue.pop_front() {
if let Some(range_to_requeue) = queued_range.greater_than(nonce) {
self.source_queue.push_front((queued_at, range_to_requeue));
break;
}
None => {
nonces_end = Some(queued_range_end);
}
}
}
nonces_end.map(|nonces_end| RangeInclusive::new(target_nonce + 1, nonces_end))
}
}
#[async_trait]
impl<SourceHeaderNumber, SourceHeaderHash, TargetHeaderNumber, TargetHeaderHash, SourceNoncesRange, Proof>
RaceStrategy<HeaderId<SourceHeaderHash, SourceHeaderNumber>, HeaderId<TargetHeaderHash, TargetHeaderNumber>, Proof>
@@ -288,9 +267,11 @@ where
Proof,
>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
self.select_nonces_to_deliver_with_selector(race_state, |_| async { None })
.await
.map(|range| (range, ()))
let maximal_source_queue_index = self.maximal_available_source_queue_index(race_state)?;
let range_begin = self.source_queue[0].1.begin();
let range_end = self.source_queue[maximal_source_queue_index].1.end();
self.remove_le_nonces_from_source_queue(range_end);
Some((range_begin..=range_end, ()))
}
}
@@ -298,7 +279,9 @@ where
mod tests {
use super::*;
use crate::message_lane::MessageLane;
use crate::message_lane_loop::tests::{header_id, TestMessageLane, TestMessagesProof};
use crate::message_lane_loop::tests::{
header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderHash, TestSourceHeaderNumber,
};
type SourceNoncesRange = RangeInclusive<MessageNonce>;
@@ -454,57 +437,58 @@ mod tests {
}
#[test]
fn select_nonces_to_deliver_able_to_split_ranges_with_selector() {
fn maximal_available_source_queue_index_works() {
let mut state = RaceState::<_, _, TestMessagesProof>::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));
strategy.source_nonces_updated(header_id(1), source_nonces(1..=3));
strategy.source_nonces_updated(header_id(2), source_nonces(4..=6));
strategy.source_nonces_updated(header_id(3), source_nonces(7..=9));
state.best_finalized_source_header_id_at_best_target = Some(header_id(0));
assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), None);
state.best_finalized_source_header_id_at_source = Some(header_id(1));
state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
state.best_target_header_id = Some(header_id(1));
assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(0));
assert_eq!(
async_std::task::block_on(
strategy.select_nonces_to_deliver_with_selector(state, |_| async { Some(50..=100) })
),
Some(1..=49),
);
state.best_finalized_source_header_id_at_best_target = Some(header_id(2));
assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(1));
state.best_finalized_source_header_id_at_best_target = Some(header_id(3));
assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(2));
state.best_finalized_source_header_id_at_best_target = Some(header_id(4));
assert_eq!(strategy.maximal_available_source_queue_index(state), Some(2));
}
fn run_panic_test_for_incorrect_selector(
invalid_selector: impl Fn(SourceNoncesRange) -> Option<SourceNoncesRange>,
) {
#[test]
fn remove_le_nonces_from_source_queue_works() {
let mut state = RaceState::<_, _, TestMessagesProof>::default();
let mut strategy = BasicStrategy::<TestMessageLane>::new();
strategy.source_nonces_updated(header_id(1), source_nonces(1..=100));
strategy.best_target_nonces_updated(target_nonces(50), &mut state);
state.best_finalized_source_header_id_at_source = Some(header_id(1));
state.best_finalized_source_header_id_at_best_target = Some(header_id(1));
state.best_target_header_id = Some(header_id(1));
async_std::task::block_on(async move {
strategy
.select_nonces_to_deliver_with_selector(state, |range| async { invalid_selector(range) })
.await;
});
strategy.best_target_nonces_updated(target_nonces(0), &mut state);
strategy.source_nonces_updated(header_id(1), source_nonces(1..=3));
strategy.source_nonces_updated(header_id(2), source_nonces(4..=6));
strategy.source_nonces_updated(header_id(3), source_nonces(7..=9));
fn source_queue_nonces(
source_queue: &SourceRangesQueue<TestSourceHeaderHash, TestSourceHeaderNumber, SourceNoncesRange>,
) -> Vec<MessageNonce> {
source_queue.iter().flat_map(|(_, range)| range.clone()).collect()
}
#[test]
#[should_panic]
fn select_nonces_to_deliver_panics_if_selector_returns_empty_range() {
#[allow(clippy::reversed_empty_ranges)]
run_panic_test_for_incorrect_selector(|_| Some(2..=1))
}
strategy.remove_le_nonces_from_source_queue(1);
assert_eq!(
source_queue_nonces(&strategy.source_queue),
vec![2, 3, 4, 5, 6, 7, 8, 9],
);
#[test]
#[should_panic]
fn select_nonces_to_deliver_panics_if_selector_returns_range_that_starts_before_passed_range() {
run_panic_test_for_incorrect_selector(|range| Some(range.begin() - 1..=*range.end()))
}
strategy.remove_le_nonces_from_source_queue(5);
assert_eq!(source_queue_nonces(&strategy.source_queue), vec![6, 7, 8, 9],);
#[test]
#[should_panic]
fn select_nonces_to_deliver_panics_if_selector_returns_range_with_mismatched_end() {
run_panic_test_for_incorrect_selector(|range| Some(range.begin()..=*range.end() + 1))
strategy.remove_le_nonces_from_source_queue(9);
assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::<MessageNonce>::new(),);
strategy.remove_le_nonces_from_source_queue(100);
assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::<MessageNonce>::new(),);
}
}