staking miner: less aggresive submissions (#6978)

* staking miner: less aggresive submissions

We have noticed that the staking-miner performs many concurrent RPC calls (more than 256).
Probably because these batch request are getting bigger because the state is growing.

So let's relax this and mine solutions sequentially i.e, mine solution
one solution at the time and not in concurrently.

* add check if self hasn't submitted after mining
This commit is contained in:
Niklas Adolfsson
2023-03-30 16:01:18 +02:00
committed by GitHub
parent 55b4aceb99
commit bf8559a37c
2 changed files with 15 additions and 3 deletions
+14 -3
View File
@@ -24,7 +24,8 @@ use jsonrpsee::core::Error as RpcError;
use sc_transaction_pool_api::TransactionStatus;
use sp_core::storage::StorageKey;
use sp_runtime::Perbill;
use tokio::sync::mpsc;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf};
/// Ensure that now is the signed phase.
@@ -170,6 +171,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
let mut subscription = heads_subscription().await?;
let (tx, mut rx) = mpsc::unbounded_channel::<StakingMinerError>();
let submit_lock = Arc::new(Mutex::new(()));
loop {
let at = tokio::select! {
@@ -201,9 +203,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
// Spawn task and non-recoverable errors are sent back to the main task
// such as if the connection has been closed.
tokio::spawn(
send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone())
send_and_watch_extrinsic(rpc.clone(), tx.clone(), at, signer.clone(), config.clone(), submit_lock.clone())
);
}
/// Construct extrinsic at given block and watch it.
@@ -213,6 +214,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
at: Header,
signer: Signer,
config: MonitorConfig,
submit_lock: Arc<Mutex<()>>,
) {
async fn flatten<T>(
@@ -255,6 +257,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
return;
}
let _lock = submit_lock.lock().await;
let mut ext = match crate::create_election_ext::<Runtime, Block>(rpc.clone(), Some(hash), vec![]).await {
Ok(ext) => ext,
Err(err) => {
@@ -302,6 +306,7 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
let rpc1 = rpc.clone();
let rpc2 = rpc.clone();
let rpc3 = rpc.clone();
let latest_head = match get_latest_head::<Runtime>(&rpc, &config.listen).await {
Ok(hash) => hash,
@@ -325,10 +330,16 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
ensure_signed_phase::<Runtime, Block>(&rpc2, latest_head).await
});
let account = signer.account.clone();
let no_prev_sol_fut = tokio::spawn(async move {
ensure_no_previous_solution::<Runtime, Block>(&rpc3, latest_head, &account).await
});
// Run the calls in parallel and return once all has completed or any failed.
if let Err(err) = tokio::try_join!(
flatten(ensure_strategy_met_fut),
flatten(ensure_signed_phase_fut),
flatten(no_prev_sol_fut),
) {
log::debug!(target: LOG_TARGET, "Skipping to submit at block {}; {}", at.number, err);
return;
+1
View File
@@ -131,6 +131,7 @@ impl SharedRpcClient {
.connection_timeout(connection_timeout)
.max_request_body_size(u32::MAX)
.request_timeout(request_timeout)
.max_concurrent_requests(u32::MAX as usize)
.build(uri)
.await?;
Ok(Self(Arc::new(client)))