diff --git a/bridges/relays/bin-substrate/src/cli/resubmit_transactions.rs b/bridges/relays/bin-substrate/src/cli/resubmit_transactions.rs index 03a9c114b0..d20cff17f6 100644 --- a/bridges/relays/bin-substrate/src/cli/resubmit_transactions.rs +++ b/bridges/relays/bin-substrate/src/cli/resubmit_transactions.rs @@ -14,12 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see . -use crate::cli::{TargetConnectionParams, TargetSigningParams}; +use crate::cli::{Balance, TargetConnectionParams, TargetSigningParams}; use codec::{Decode, Encode}; use num_traits::{One, Zero}; use relay_substrate_client::{ - BlockWithJustification, Chain, Client, Error as SubstrateError, TransactionSignScheme, + BlockWithJustification, Chain, Client, Error as SubstrateError, HeaderOf, TransactionSignScheme, }; use relay_utils::FailedClient; use sp_core::Bytes; @@ -40,6 +40,19 @@ pub struct ResubmitTransactions { target: TargetConnectionParams, #[structopt(flatten)] target_sign: TargetSigningParams, + /// Number of blocks we see before considering queued transaction as stalled. + #[structopt(long, default_value = "5")] + stalled_blocks: u32, + /// Tip limit. We'll never submit transaction with larger tip. + #[structopt(long)] + tip_limit: Balance, + /// Tip increase step. We'll be checking updated transaction priority by increasing its tip by + /// this step. + #[structopt(long)] + tip_step: Balance, + /// Priority selection strategy. + #[structopt(subcommand)] + strategy: PrioritySelectionStrategy, } /// Chain, which transactions we're going to track && resubmit. @@ -47,6 +60,28 @@ pub struct ResubmitTransactions { #[strum(serialize_all = "kebab_case")] pub enum RelayChain { Millau, + Kusama, + Polkadot, +} + +/// Strategy to use for priority selection. +#[derive(StructOpt, Debug, PartialEq, Eq, Clone, Copy)] +pub enum PrioritySelectionStrategy { + /// Strategy selects tip that changes transaction priority to be better than priority of + /// the first transaction of previous block. + /// + /// It only makes sense to use this strategy for Millau transactions. Millau has transactions + /// that are close to block limits, so if there are any other queued transactions, 'large' + /// transaction won't fit the block && will be postponed. To avoid this, we change its priority + /// to some large value, making it best transaction => it'll be 'mined' first. + MakeItBestTransaction, + /// Strategy selects tip that changes transaction priority to be better than priority of + /// selected queued transaction. + /// + /// When we first see stalled transaction, we make it better than worst 1/4 of queued + /// transactions. If it is still stalled, we'll make it better than 1/3 of queued transactions, + /// ... + MakeItBetterThanQueuedTransaction, } macro_rules! select_bridge { @@ -56,20 +91,17 @@ macro_rules! select_bridge { type Target = relay_millau_client::Millau; type TargetSign = relay_millau_client::Millau; - // When large message is being sent from Millau to Rialto AND other transactions are - // blocking it from being mined, we'll see something like this in logs: - // - // Millau transaction priority with tip=0: 17800827994. Target priority: - // 526186677695 - // - // So since fee multiplier in Millau is `1` and `WeightToFee` is `IdentityFee`, then - // we need tip around `526186677695 - 17800827994 = 508_385_849_701`. Let's round it - // up to `1_000_000_000_000`. + $generic + }, + RelayChain::Kusama => { + type Target = relay_kusama_client::Kusama; + type TargetSign = relay_kusama_client::Kusama; - const TIP_STEP: bp_millau::Balance = 1_000_000_000; - const TIP_LIMIT: bp_millau::Balance = 1_000_000_000_000; - - const STALLED_BLOCKS: bp_millau::BlockNumber = 5; + $generic + }, + RelayChain::Polkadot => { + type Target = relay_polkadot_client::Polkadot; + type TargetSign = relay_polkadot_client::Polkadot; $generic }, @@ -91,11 +123,20 @@ impl ResubmitTransactions { client, key_pair.clone(), Context { + strategy: self.strategy, + best_header: HeaderOf::::new( + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ), transaction: None, + resubmitted: 0, stalled_for: Zero::zero(), - stalled_for_limit: STALLED_BLOCKS, - tip_step: TIP_STEP, - tip_limit: TIP_LIMIT, + stalled_for_limit: self.stalled_blocks.into(), + tip_step: self.tip_step.cast().into(), + tip_limit: self.tip_limit.cast().into(), }, ) }) @@ -104,10 +145,32 @@ impl ResubmitTransactions { } } -#[derive(Debug, Default)] +impl PrioritySelectionStrategy { + /// Select target priority. + async fn select_target_priority>( + &self, + client: &Client, + context: &Context, + ) -> Result, SubstrateError> { + match *self { + PrioritySelectionStrategy::MakeItBestTransaction => + read_previous_block_best_priority::(client, context).await, + PrioritySelectionStrategy::MakeItBetterThanQueuedTransaction => + select_priority_from_queue::(client, context).await, + } + } +} + +#[derive(Debug)] struct Context { + /// Priority selection strategy. + strategy: PrioritySelectionStrategy, + /// Best known block header. + best_header: C::Header, /// Hash of the (potentially) stalled transaction. transaction: Option, + /// How many times we have resubmitted this `transaction`? + resubmitted: u32, /// This transaction is in pool for `stalled_for` wakeup intervals. stalled_for: C::BlockNumber, /// When `stalled_for` reaching this limit, transaction is considered stalled. @@ -124,10 +187,11 @@ impl Context { self.stalled_for >= self.stalled_for_limit } - /// Forget stalled transaction. - fn clear(mut self) -> Self { - self.transaction = None; + /// Notice resubmitted transaction. + fn notice_resubmitted_transaction(mut self, transaction: C::Hash) -> Self { + self.transaction = Some(transaction); self.stalled_for = Zero::zero(); + self.resubmitted += 1; self } @@ -138,6 +202,7 @@ impl Context { } else { self.transaction = Some(transaction); self.stalled_for = One::one(); + self.resubmitted = 0; } self } @@ -172,8 +237,12 @@ async fn run_until_connection_lost async fn run_loop_iteration>( client: Client, key_pair: S::AccountKeyPair, - context: Context, + mut context: Context, ) -> Result, SubstrateError> { + // correct best header is required for all other actions + context.best_header = client.best_header().await?; + + // check if there's queued transaction, signed by given author let original_transaction = match lookup_signer_transaction::(&client, &key_pair).await? { Some(original_transaction) => original_transaction, None => { @@ -184,6 +253,7 @@ async fn run_loop_iteration>( let original_transaction_hash = C::Hasher::hash(&original_transaction.encode()); let context = context.notice_transaction(original_transaction_hash); + // if transaction hasn't been mined for `stalled_blocks`, we'll need to resubmit it if !context.is_stalled() { log::trace!( target: "bridge", @@ -196,18 +266,21 @@ async fn run_loop_iteration>( return Ok(context) } - let (best_block, target_priority) = match read_previous_best_priority::(&client).await? { - Some((best_block, target_priority)) => (best_block, target_priority), - None => { - log::trace!(target: "bridge", "Failed to read priority of best {} transaction in its best block", C::NAME); - return Ok(context) - }, - }; + // select priority for updated transaction + let target_priority = + match context.strategy.select_target_priority::(&client, &context).await? { + Some(target_priority) => target_priority, + None => { + log::trace!(target: "bridge", "Failed to select target priority"); + return Ok(context) + }, + }; - let (is_updated, updated_transaction) = select_transaction_tip::( + // update transaction tip + let (is_updated, updated_transaction) = update_transaction_tip::( &client, &key_pair, - best_block, + context.best_header.hash(), original_transaction, context.tip_step, context.tip_limit, @@ -232,7 +305,7 @@ async fn run_loop_iteration>( updated_transaction_hash, ); - Ok(context.clear()) + Ok(context.notice_resubmitted_transaction(updated_transaction_hash)) } /// Search transaction pool for transaction, signed by given key pair. @@ -255,31 +328,73 @@ async fn lookup_signer_transaction } /// Read priority of best signed transaction of previous block. -async fn read_previous_best_priority>( +async fn read_previous_block_best_priority>( client: &Client, -) -> Result, SubstrateError> { - let best_header = client.best_header().await?; - let best_header_hash = best_header.hash(); - let best_block = client.get_block(Some(best_header_hash)).await?; + context: &Context, +) -> Result, SubstrateError> { + let best_block = client.get_block(Some(context.best_header.hash())).await?; let best_transaction = best_block .extrinsics() .iter() .filter_map(|xt| S::SignedTransaction::decode(&mut &xt[..]).ok()) .find(|xt| S::is_signed(xt)); match best_transaction { - Some(best_transaction) => Ok(Some(( - best_header_hash, + Some(best_transaction) => Ok(Some( client - .validate_transaction(*best_header.parent_hash(), best_transaction) + .validate_transaction(*context.best_header.parent_hash(), best_transaction) .await?? .priority, - ))), + )), None => Ok(None), } } +/// Select priority of some queued transaction. +async fn select_priority_from_queue>( + client: &Client, + context: &Context, +) -> Result, SubstrateError> { + // select transaction from the queue + let queued_transactions = client.pending_extrinsics().await?; + let selected_transaction = match select_transaction_from_queue(queued_transactions, context) { + Some(selected_transaction) => selected_transaction, + None => return Ok(None), + }; + + let selected_transaction = S::SignedTransaction::decode(&mut &selected_transaction[..]) + .map_err(SubstrateError::ResponseParseFailed)?; + let target_priority = client + .validate_transaction(context.best_header.hash(), selected_transaction) + .await?? + .priority; + Ok(Some(target_priority)) +} + +/// Select transaction with target priority from the vec of queued transactions. +fn select_transaction_from_queue( + mut queued_transactions: Vec, + context: &Context, +) -> Option { + if queued_transactions.is_empty() { + return None + } + + // the more times we resubmit transaction (`context.resubmitted`), the closer we move + // to the front of the transaction queue + let total_transactions = queued_transactions.len(); + let resubmitted_factor = context.resubmitted; + let divisor = + 1usize.saturating_add(1usize.checked_shl(resubmitted_factor).unwrap_or(usize::MAX)); + let transactions_to_skip = total_transactions / divisor; + + Some( + queued_transactions + .swap_remove(std::cmp::min(total_transactions - 1, transactions_to_skip)), + ) +} + /// Try to find appropriate tip for transaction so that its priority is larger than given. -async fn select_transaction_tip>( +async fn update_transaction_tip>( client: &Client, key_pair: &S::AccountKeyPair, at_block: C::Hash, @@ -347,31 +462,97 @@ async fn select_transaction_tip>( #[cfg(test)] mod tests { use super::*; + use bp_rialto::Hash; use relay_rialto_client::Rialto; - #[test] - fn context_works() { - let mut context: Context = Context { + fn context() -> Context { + Context { + strategy: PrioritySelectionStrategy::MakeItBestTransaction, + best_header: HeaderOf::::new( + Default::default(), + Default::default(), + Default::default(), + Default::default(), + Default::default(), + ), transaction: None, + resubmitted: 0, stalled_for: Zero::zero(), stalled_for_limit: 3, tip_step: 100, tip_limit: 1000, - }; + } + } + + #[test] + fn context_works() { + let mut context = context(); // when transaction is noticed 2/3 times, it isn't stalled context = context.notice_transaction(Default::default()); assert!(!context.is_stalled()); + assert_eq!(context.stalled_for, 1); + assert_eq!(context.resubmitted, 0); context = context.notice_transaction(Default::default()); assert!(!context.is_stalled()); + assert_eq!(context.stalled_for, 2); + assert_eq!(context.resubmitted, 0); // when transaction is noticed for 3rd time in a row, it is considered stalled context = context.notice_transaction(Default::default()); assert!(context.is_stalled()); + assert_eq!(context.stalled_for, 3); + assert_eq!(context.resubmitted, 0); // and after we resubmit it, we forget previous transaction - context = context.clear(); - assert_eq!(context.transaction, None); + context = context.notice_resubmitted_transaction(Hash::from([1; 32])); + assert_eq!(context.transaction, Some(Hash::from([1; 32]))); + assert_eq!(context.resubmitted, 1); assert_eq!(context.stalled_for, 0); } + + #[test] + fn select_transaction_from_queue_works_with_empty_queue() { + assert_eq!(select_transaction_from_queue(vec![], &context()), None); + } + + #[test] + fn select_transaction_from_queue_works() { + let mut context = context(); + let queued_transactions = vec![ + Bytes(vec![1]), + Bytes(vec![2]), + Bytes(vec![3]), + Bytes(vec![4]), + Bytes(vec![5]), + Bytes(vec![6]), + ]; + + // when we resubmit tx for the first time, 1/2 of queue is skipped + assert_eq!( + select_transaction_from_queue(queued_transactions.clone(), &context), + Some(Bytes(vec![4])), + ); + + // when we resubmit tx for the second time, 1/3 of queue is skipped + context = context.notice_resubmitted_transaction(Hash::from([1; 32])); + assert_eq!( + select_transaction_from_queue(queued_transactions.clone(), &context), + Some(Bytes(vec![3])), + ); + + // when we resubmit tx for the third time, 1/5 of queue is skipped + context = context.notice_resubmitted_transaction(Hash::from([2; 32])); + assert_eq!( + select_transaction_from_queue(queued_transactions.clone(), &context), + Some(Bytes(vec![2])), + ); + + // when we resubmit tx for the second time, 1/9 of queue is skipped + context = context.notice_resubmitted_transaction(Hash::from([3; 32])); + assert_eq!( + select_transaction_from_queue(queued_transactions.clone(), &context), + Some(Bytes(vec![1])), + ); + } }