From bf8559a37cbea7d0cbb97189d5246ebe304c3484 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 30 Mar 2023 16:01:18 +0200 Subject: [PATCH] staking miner: less aggresive submissions (#6978) * staking miner: less aggresive submissions We have noticed that the staking-miner performs many concurrent RPC calls (more than 256). Probably because these batch request are getting bigger because the state is growing. So let's relax this and mine solutions sequentially i.e, mine solution one solution at the time and not in concurrently. * add check if self hasn't submitted after mining --- polkadot/utils/staking-miner/src/monitor.rs | 17 ++++++++++++++--- polkadot/utils/staking-miner/src/rpc.rs | 1 + 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/polkadot/utils/staking-miner/src/monitor.rs b/polkadot/utils/staking-miner/src/monitor.rs index bfc075668e..2395c7b7fc 100644 --- a/polkadot/utils/staking-miner/src/monitor.rs +++ b/polkadot/utils/staking-miner/src/monitor.rs @@ -24,7 +24,8 @@ use jsonrpsee::core::Error as RpcError; use sc_transaction_pool_api::TransactionStatus; use sp_core::storage::StorageKey; use sp_runtime::Perbill; -use tokio::sync::mpsc; +use std::sync::Arc; +use tokio::sync::{mpsc, Mutex}; use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf}; /// Ensure that now is the signed phase. @@ -170,6 +171,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let mut subscription = heads_subscription().await?; let (tx, mut rx) = mpsc::unbounded_channel::(); + let submit_lock = Arc::new(Mutex::new(())); loop { let at = tokio::select! { @@ -201,9 +203,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { // Spawn task and non-recoverable errors are sent back to the main task // such as if the connection has been closed. tokio::spawn( - send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone()) + send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone(), submit_lock.clone()) ); - } /// Construct extrinsic at given block and watch it. @@ -213,6 +214,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { at: Header, signer: Signer, config: MonitorConfig, + submit_lock: Arc>, ) { async fn flatten( @@ -255,6 +257,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { return; } + let _lock = submit_lock.lock().await; + let mut ext = match crate::create_election_ext::(rpc.clone(), Some(hash), vec![]).await { Ok(ext) => ext, Err(err) => { @@ -302,6 +306,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { let rpc1 = rpc.clone(); let rpc2 = rpc.clone(); + let rpc3 = rpc.clone(); let latest_head = match get_latest_head::(&rpc, &config.listen).await { Ok(hash) => hash, @@ -325,10 +330,16 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! { ensure_signed_phase::(&rpc2, latest_head).await }); + let account = signer.account.clone(); + let no_prev_sol_fut = tokio::spawn(async move { + ensure_no_previous_solution::(&rpc3, latest_head, &account).await + }); + // Run the calls in parallel and return once all has completed or any failed. if let Err(err) = tokio::try_join!( flatten(ensure_strategy_met_fut), flatten(ensure_signed_phase_fut), + flatten(no_prev_sol_fut), ) { log::debug!(target: LOG_TARGET, "Skipping to submit at block {}; {}", at.number, err); return; diff --git a/polkadot/utils/staking-miner/src/rpc.rs b/polkadot/utils/staking-miner/src/rpc.rs index 8929afcbe6..1da24b4aae 100644 --- a/polkadot/utils/staking-miner/src/rpc.rs +++ b/polkadot/utils/staking-miner/src/rpc.rs @@ -131,6 +131,7 @@ impl SharedRpcClient { .connection_timeout(connection_timeout) .max_request_body_size(u32::MAX) .request_timeout(request_timeout) + .max_concurrent_requests(u32::MAX as usize) .build(uri) .await?; Ok(Self(Arc::new(client)))