From 87c48f649b47e4357a1bdd7619f985640392019d Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky Date: Mon, 21 Jun 2021 09:16:06 +0300 Subject: [PATCH] Use plain source_queue view when selecting nonces for delivery (#1010) * use plain source_queue view when selecting nonces for delivery * Revert "use plain source_queue view when selecting nonces for delivery" This reverts commit f1fdc3fa9a0934271b125328e69e4f16014780d1. * Revert "Revert "use plain source_queue view when selecting nonces for delivery"" This reverts commit ccefa5ece24c675649251ebc5c686ef972eed2f4. * clippy * fmt --- .../messages/src/message_race_delivery.rs | 102 +++++++--- .../messages/src/message_race_strategy.rs | 174 ++++++++---------- 2 files changed, 154 insertions(+), 122 deletions(-) diff --git a/bridges/relays/messages/src/message_race_delivery.rs b/bridges/relays/messages/src/message_race_delivery.rs index 8e88b3763c..c4a5b235a5 100644 --- a/bridges/relays/messages/src/message_race_delivery.rs +++ b/bridges/relays/messages/src/message_race_delivery.rs @@ -23,7 +23,7 @@ use crate::message_race_loop::{ MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces, TargetClient, TargetClientNonces, }; -use crate::message_race_strategy::BasicStrategy; +use crate::message_race_strategy::{BasicStrategy, SourceRangesQueue}; use crate::metrics::MessageLaneLoopMetrics; use async_trait::async_trait; @@ -31,7 +31,12 @@ use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight}; use futures::stream::FusedStream; use num_traits::{SaturatingAdd, Zero}; use relay_utils::FailedClient; -use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration}; +use std::{ + collections::VecDeque, + marker::PhantomData, + ops::{Range, RangeInclusive}, + time::Duration, +}; /// Run message delivery race. pub async fn run( @@ -499,22 +504,25 @@ where let lane_source_client = self.lane_source_client.clone(); let lane_target_client = self.lane_target_client.clone(); + let maximal_source_queue_index = self.strategy.maximal_available_source_queue_index(race_state)?; let previous_total_dispatch_weight = self.total_queued_dispatch_weight(); - let selected_nonces = self - .strategy - .select_nonces_to_deliver_with_selector(race_state.clone(), |range| async { - select_nonces_for_delivery_transaction( - relayer_mode, - max_nonces, - max_messages_weight_in_single_batch, - max_messages_size_in_single_batch, - lane_source_client.clone(), - lane_target_client.clone(), - range, - ) - .await - }) - .await?; + let source_queue = self.strategy.source_queue(); + let range_end = select_nonces_for_delivery_transaction( + relayer_mode, + max_nonces, + max_messages_weight_in_single_batch, + max_messages_size_in_single_batch, + lane_source_client.clone(), + lane_target_client.clone(), + source_queue, + 0..maximal_source_queue_index + 1, + ) + .await?; + + let range_begin = source_queue[0].1.begin(); + let selected_nonces = range_begin..=range_end; + self.strategy.remove_le_nonces_from_source_queue(range_end); + let new_total_dispatch_weight = self.total_queued_dispatch_weight(); let dispatch_weight = previous_total_dispatch_weight - new_total_dispatch_weight; @@ -533,6 +541,7 @@ where /// /// The function returns nonces that are NOT selected for current batch and will be /// delivered later. +#[allow(clippy::too_many_arguments)] async fn select_nonces_for_delivery_transaction( relayer_mode: RelayerMode, max_messages_in_this_batch: MessageNonce, @@ -540,8 +549,13 @@ async fn select_nonces_for_delivery_transaction( max_messages_size_in_single_batch: u32, lane_source_client: impl MessageLaneSourceClient

, lane_target_client: impl MessageLaneTargetClient

, - ready_nonces: MessageDetailsMap, -) -> Option> { + nonces_queue: &SourceRangesQueue< + P::SourceHeaderHash, + P::SourceHeaderNumber, + MessageDetailsMap, + >, + nonces_queue_range: Range, +) -> Option { let mut hard_selected_count = 0; let mut soft_selected_count = 0; @@ -563,7 +577,11 @@ async fn select_nonces_for_delivery_transaction( Zero::zero() }; - for (index, (nonce, details)) in ready_nonces.iter().enumerate() { + let all_ready_nonces = nonces_queue + .range(nonces_queue_range.clone()) + .flat_map(|(_, ready_nonces)| ready_nonces.iter()) + .enumerate(); + for (index, (nonce, details)) in all_ready_nonces { // Since we (hopefully) have some reserves in `max_messages_weight_in_single_batch` // and `max_messages_size_in_single_batch`, we may still try to submit transaction // with single message if message overflows these limits. The worst case would be if @@ -671,23 +689,27 @@ async fn select_nonces_for_delivery_transaction( selected_count = new_selected_count; } + let hard_selected_begin_nonce = nonces_queue[nonces_queue_range.start].1.begin(); if hard_selected_count != soft_selected_count { + let hard_selected_end_nonce = hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1; + let soft_selected_begin_nonce = hard_selected_begin_nonce; + let soft_selected_end_nonce = soft_selected_begin_nonce + soft_selected_count as MessageNonce - 1; log::warn!( target: "bridge", "Relayer may deliver nonces [{:?}; {:?}], but because of its strategy ({:?}) it has selected \ nonces [{:?}; {:?}].", - ready_nonces.keys().next(), - ready_nonces.keys().next().map(|begin| begin + (hard_selected_count as MessageNonce) - 1), + hard_selected_begin_nonce, + hard_selected_end_nonce, relayer_mode, - ready_nonces.keys().next(), - ready_nonces.keys().next().map(|begin| begin + (soft_selected_count as MessageNonce) - 1), - + soft_selected_begin_nonce, + soft_selected_end_nonce, ); hard_selected_count = soft_selected_count; } - if hard_selected_count != ready_nonces.len() { - Some(ready_nonces.into_iter().skip(hard_selected_count).collect()) + + if hard_selected_count != 0 { + Some(hard_selected_begin_nonce + hard_selected_count as MessageNonce - 1) } else { None } @@ -1127,4 +1149,30 @@ mod tests { Some(((20..=23), proof_parameters(false, 4))) ); } + + #[async_std::test] + async fn relayer_uses_flattened_view_of_the_source_queue_to_select_nonces() { + // Real scenario that has happened on test deployments: + // 1) relayer witnessed M1 at block 1 => it has separate entry in the `source_queue` + // 2) relayer witnessed M2 at block 2 => it has separate entry in the `source_queue` + // 3) if block 2 is known to the target node, then both M1 and M2 are selected for single delivery, + // even though weight(M1+M2) > larger than largest allowed weight + // + // This was happening because selector (`select_nonces_for_delivery_transaction`) has been called + // for every `source_queue` entry separately without preserving any context. + let (mut state, mut strategy) = prepare_strategy(); + let nonces = source_nonces(24..=25, 19, DEFAULT_REWARD - DELIVERY_TRANSACTION_COST); + strategy.strategy.source_nonces_updated(header_id(2), nonces); + strategy.max_unrewarded_relayer_entries_at_target = 100; + strategy.max_unconfirmed_nonces_at_target = 100; + strategy.max_messages_in_single_batch = 5; + strategy.max_messages_weight_in_single_batch = 100; + strategy.max_messages_size_in_single_batch = 100; + state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); + + assert_eq!( + strategy.select_nonces_to_deliver(state).await, + Some(((20..=24), proof_parameters(false, 5))) + ); + } } diff --git a/bridges/relays/messages/src/message_race_strategy.rs b/bridges/relays/messages/src/message_race_strategy.rs index c17845d97f..ed4a276e14 100644 --- a/bridges/relays/messages/src/message_race_strategy.rs +++ b/bridges/relays/messages/src/message_race_strategy.rs @@ -22,7 +22,11 @@ use crate::message_race_loop::{NoncesRange, RaceState, RaceStrategy, SourceClien use async_trait::async_trait; use bp_messages::MessageNonce; use relay_utils::HeaderId; -use std::{collections::VecDeque, fmt::Debug, future::Future, marker::PhantomData, ops::RangeInclusive}; +use std::{collections::VecDeque, fmt::Debug, marker::PhantomData, ops::RangeInclusive}; + +/// Queue of nonces known to the source node. +pub type SourceRangesQueue = + VecDeque<(HeaderId, SourceNoncesRange)>; /// Nonces delivery strategy. #[derive(Debug)] @@ -35,7 +39,7 @@ pub struct BasicStrategy< Proof, > { /// All queued nonces. - source_queue: VecDeque<(HeaderId, SourceNoncesRange)>, + source_queue: SourceRangesQueue, /// Best nonce known to target node (at its best block). `None` if it has not been received yet. best_target_nonce: Option, /// Unused generic types dump. @@ -73,25 +77,21 @@ where &mut self.source_queue } - /// Should return `Some(nonces)` if we need to deliver proof of `nonces` (and associated - /// data) from source to target node. + /// Returns index of the latest source queue entry, that may be delivered to the target node. /// - /// The `selector` function receives range of nonces and should return `None` if the whole - /// range needs to be delivered. If there are some nonces in the range that can't be delivered - /// right now, it should return `Some` with 'undeliverable' nonces. Please keep in mind that - /// this should be the sub-range that the passed range ends with, because nonces are always - /// delivered in-order. Otherwise the function will panic. - pub async fn select_nonces_to_deliver_with_selector>>( - &mut self, + /// Returns `None` if no entries may be delivered. All entries before and including the `Some(_)` + /// index are guaranteed to be witnessed at source blocks that are known to be finalized at the + /// target node. + pub fn maximal_available_source_queue_index( + &self, race_state: RaceState< HeaderId, HeaderId, Proof, >, - selector: impl Fn(SourceNoncesRange) -> F, - ) -> Option> { + ) -> Option { // if we do not know best nonce at target node, we can't select anything - let target_nonce = self.best_target_nonce?; + let _ = self.best_target_nonce?; // if we have already selected nonces that we want to submit, do nothing if race_state.nonces_to_submit.is_some() { @@ -107,47 +107,26 @@ where // 2) we can't deliver new nonce until header, that has emitted this nonce, is finalized // by target client // 3) selector is used for more complicated logic - let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target.clone()?; - let mut nonces_end = None; + // + // => let's first select range of entries inside deque that are already finalized at + // the target client and pass this range to the selector + let best_header_at_target = race_state.best_finalized_source_header_id_at_best_target?; + self.source_queue + .iter() + .enumerate() + .take_while(|(_, (queued_at, _))| queued_at.0 <= best_header_at_target.0) + .map(|(index, _)| index) + .last() + } + + /// Remove all nonces that are less than or equal to given nonce from the source queue. + pub fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) { while let Some((queued_at, queued_range)) = self.source_queue.pop_front() { - // select (sub) range to deliver - let queued_range_begin = queued_range.begin(); - let queued_range_end = queued_range.end(); - let range_to_requeue = if queued_at.0 > best_header_at_target.0 { - // if header that has queued the range is not yet finalized at bridged chain, - // we can't prove anything - Some(queued_range) - } else { - // selector returns `Some(range)` if this `range` needs to be requeued - selector(queued_range).await - }; - - // requeue (sub) range and update range to deliver - match range_to_requeue { - Some(range_to_requeue) => { - assert!( - range_to_requeue.begin() <= range_to_requeue.end() - && range_to_requeue.begin() >= queued_range_begin - && range_to_requeue.end() == queued_range_end, - "Incorrect implementation of internal `selector` function. Expected original\ - range {:?} to end with returned range {:?}", - queued_range_begin..=queued_range_end, - range_to_requeue, - ); - - if range_to_requeue.begin() != queued_range_begin { - nonces_end = Some(range_to_requeue.begin() - 1); - } - self.source_queue.push_front((queued_at, range_to_requeue)); - break; - } - None => { - nonces_end = Some(queued_range_end); - } + if let Some(range_to_requeue) = queued_range.greater_than(nonce) { + self.source_queue.push_front((queued_at, range_to_requeue)); + break; } } - - nonces_end.map(|nonces_end| RangeInclusive::new(target_nonce + 1, nonces_end)) } } @@ -288,9 +267,11 @@ where Proof, >, ) -> Option<(RangeInclusive, Self::ProofParameters)> { - self.select_nonces_to_deliver_with_selector(race_state, |_| async { None }) - .await - .map(|range| (range, ())) + let maximal_source_queue_index = self.maximal_available_source_queue_index(race_state)?; + let range_begin = self.source_queue[0].1.begin(); + let range_end = self.source_queue[maximal_source_queue_index].1.end(); + self.remove_le_nonces_from_source_queue(range_end); + Some((range_begin..=range_end, ())) } } @@ -298,7 +279,9 @@ where mod tests { use super::*; use crate::message_lane::MessageLane; - use crate::message_lane_loop::tests::{header_id, TestMessageLane, TestMessagesProof}; + use crate::message_lane_loop::tests::{ + header_id, TestMessageLane, TestMessagesProof, TestSourceHeaderHash, TestSourceHeaderNumber, + }; type SourceNoncesRange = RangeInclusive; @@ -454,57 +437,58 @@ mod tests { } #[test] - fn select_nonces_to_deliver_able_to_split_ranges_with_selector() { + fn maximal_available_source_queue_index_works() { let mut state = RaceState::<_, _, TestMessagesProof>::default(); let mut strategy = BasicStrategy::::new(); strategy.best_target_nonces_updated(target_nonces(0), &mut state); - strategy.source_nonces_updated(header_id(1), source_nonces(1..=100)); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=3)); + strategy.source_nonces_updated(header_id(2), source_nonces(4..=6)); + strategy.source_nonces_updated(header_id(3), source_nonces(7..=9)); + + state.best_finalized_source_header_id_at_best_target = Some(header_id(0)); + assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), None); - state.best_finalized_source_header_id_at_source = Some(header_id(1)); state.best_finalized_source_header_id_at_best_target = Some(header_id(1)); - state.best_target_header_id = Some(header_id(1)); + assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(0)); - assert_eq!( - async_std::task::block_on( - strategy.select_nonces_to_deliver_with_selector(state, |_| async { Some(50..=100) }) - ), - Some(1..=49), - ); + state.best_finalized_source_header_id_at_best_target = Some(header_id(2)); + assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(1)); + + state.best_finalized_source_header_id_at_best_target = Some(header_id(3)); + assert_eq!(strategy.maximal_available_source_queue_index(state.clone()), Some(2)); + + state.best_finalized_source_header_id_at_best_target = Some(header_id(4)); + assert_eq!(strategy.maximal_available_source_queue_index(state), Some(2)); } - fn run_panic_test_for_incorrect_selector( - invalid_selector: impl Fn(SourceNoncesRange) -> Option, - ) { + #[test] + fn remove_le_nonces_from_source_queue_works() { let mut state = RaceState::<_, _, TestMessagesProof>::default(); let mut strategy = BasicStrategy::::new(); - strategy.source_nonces_updated(header_id(1), source_nonces(1..=100)); - strategy.best_target_nonces_updated(target_nonces(50), &mut state); - state.best_finalized_source_header_id_at_source = Some(header_id(1)); - state.best_finalized_source_header_id_at_best_target = Some(header_id(1)); - state.best_target_header_id = Some(header_id(1)); - async_std::task::block_on(async move { - strategy - .select_nonces_to_deliver_with_selector(state, |range| async { invalid_selector(range) }) - .await; - }); - } + strategy.best_target_nonces_updated(target_nonces(0), &mut state); + strategy.source_nonces_updated(header_id(1), source_nonces(1..=3)); + strategy.source_nonces_updated(header_id(2), source_nonces(4..=6)); + strategy.source_nonces_updated(header_id(3), source_nonces(7..=9)); - #[test] - #[should_panic] - fn select_nonces_to_deliver_panics_if_selector_returns_empty_range() { - #[allow(clippy::reversed_empty_ranges)] - run_panic_test_for_incorrect_selector(|_| Some(2..=1)) - } + fn source_queue_nonces( + source_queue: &SourceRangesQueue, + ) -> Vec { + source_queue.iter().flat_map(|(_, range)| range.clone()).collect() + } - #[test] - #[should_panic] - fn select_nonces_to_deliver_panics_if_selector_returns_range_that_starts_before_passed_range() { - run_panic_test_for_incorrect_selector(|range| Some(range.begin() - 1..=*range.end())) - } + strategy.remove_le_nonces_from_source_queue(1); + assert_eq!( + source_queue_nonces(&strategy.source_queue), + vec![2, 3, 4, 5, 6, 7, 8, 9], + ); - #[test] - #[should_panic] - fn select_nonces_to_deliver_panics_if_selector_returns_range_with_mismatched_end() { - run_panic_test_for_incorrect_selector(|range| Some(range.begin()..=*range.end() + 1)) + strategy.remove_le_nonces_from_source_queue(5); + assert_eq!(source_queue_nonces(&strategy.source_queue), vec![6, 7, 8, 9],); + + strategy.remove_le_nonces_from_source_queue(9); + assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::::new(),); + + strategy.remove_le_nonces_from_source_queue(100); + assert_eq!(source_queue_nonces(&strategy.source_queue), Vec::::new(),); } }