mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 01:51:09 +00:00
staking miner: Check the queue one last time before submission (#4819)
* staking miner: use config for emergency solution Fixes #4678 * bump jsonrpsee * run `monitor_cmd_for` until the connection is closed * new tokio task for submit_and_watch xt * re-use header subscription * update jsonrpsee + simplify code * revert polkadot runtime changes * feat: add `ensure_no_better_solution` function * storage access for submissions and indices * check ensure_no_previous_solution before remote ext * fix todos * grumbles: Perbill::from_percent * hacky fix * use modified EPM pallet and various fixes * diener update --substrate --branch na-epm-pub * Revert "diener update --substrate --branch na-epm-pub" This reverts commit b3b9a58c9313372c8f21cf247ba0c8528d9953c0. * update substrate * tokio spawn on concurrent stuff * cleanup * Update utils/staking-miner/src/monitor.rs * Update utils/staking-miner/src/monitor.rs * more cleanup * fix nits * address grumbles * only run batch reqs when signed phase * better help menu for submission strategy CLI * add tests for submission strategy
This commit is contained in:
@@ -25,7 +25,6 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master
|
||||
sp-npos-elections = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
sc-transaction-pool-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
|
||||
frame-system = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
frame-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
frame-election-provider-support = { git = "https://github.com/paritytech/substrate", branch = "master" }
|
||||
|
||||
@@ -35,6 +35,8 @@ mod prelude;
|
||||
mod rpc;
|
||||
mod signer;
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
pub(crate) use prelude::*;
|
||||
pub(crate) use signer::get_account_info;
|
||||
|
||||
@@ -45,7 +47,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
|
||||
use remote_externalities::{Builder, Mode, OnlineConfig};
|
||||
use rpc::{RpcApiClient, SharedRpcClient};
|
||||
use sp_npos_elections::ExtendedBalance;
|
||||
use sp_runtime::{traits::Block as BlockT, DeserializeOwned};
|
||||
use sp_runtime::{traits::Block as BlockT, DeserializeOwned, Perbill};
|
||||
use tracing_subscriber::{fmt, EnvFilter};
|
||||
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
@@ -243,6 +245,7 @@ enum Error<T: EPM::Config> {
|
||||
IncorrectPhase,
|
||||
AlreadySubmitted,
|
||||
VersionMismatch,
|
||||
StrategyNotSatisfied,
|
||||
}
|
||||
|
||||
impl<T: EPM::Config> From<sp_core::crypto::SecretStringError> for Error<T> {
|
||||
@@ -299,6 +302,46 @@ enum Solver {
|
||||
},
|
||||
}
|
||||
|
||||
/// Submission strategy to use.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
#[cfg_attr(test, derive(PartialEq))]
|
||||
enum SubmissionStrategy {
|
||||
// Only submit if at the time, we are the best.
|
||||
IfLeading,
|
||||
// Always submit.
|
||||
Always,
|
||||
// Submit if we are leading, or if the solution that's leading is more that the given `Perbill`
|
||||
// better than us. This helps detect obviously fake solutions and still combat them.
|
||||
ClaimBetterThan(Perbill),
|
||||
}
|
||||
|
||||
/// Custom `impl` to parse `SubmissionStrategy` from CLI.
|
||||
///
|
||||
/// Possible options:
|
||||
/// * --submission-strategy if-leading: only submit if leading
|
||||
/// * --submission-strategy always: always submit
|
||||
/// * --submission-strategy "percent-better <percent>": submit if submission is `n` percent better.
|
||||
///
|
||||
impl FromStr for SubmissionStrategy {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let s = s.trim();
|
||||
|
||||
let res = if s == "if-leading" {
|
||||
Self::IfLeading
|
||||
} else if s == "always" {
|
||||
Self::Always
|
||||
} else if s.starts_with("percent-better ") {
|
||||
let percent: u32 = s[15..].parse().map_err(|e| format!("{:?}", e))?;
|
||||
Self::ClaimBetterThan(Perbill::from_percent(percent))
|
||||
} else {
|
||||
return Err(s.into())
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
|
||||
frame_support::parameter_types! {
|
||||
/// Number of balancing iterations for a solution algorithm. Set based on the [`Solvers`] CLI
|
||||
/// config.
|
||||
@@ -320,6 +363,18 @@ struct MonitorConfig {
|
||||
/// The solver algorithm to use.
|
||||
#[clap(subcommand)]
|
||||
solver: Solver,
|
||||
|
||||
/// Submission strategy to use.
|
||||
///
|
||||
/// Possible options:
|
||||
///
|
||||
/// `--submission-strategy if-leading`: only submit if leading.
|
||||
///
|
||||
/// `--submission-strategy always`: always submit.
|
||||
///
|
||||
/// `--submission-strategy "percent-better <percent>"`: submit if the submission is `n` percent better.
|
||||
#[clap(long, parse(try_from_str), default_value = "if-leading")]
|
||||
submission_strategy: SubmissionStrategy,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Parser)]
|
||||
@@ -665,7 +720,8 @@ mod tests {
|
||||
seed_or_path: "//Alice".to_string(),
|
||||
command: Command::Monitor(MonitorConfig {
|
||||
listen: "head".to_string(),
|
||||
solver: Solver::SeqPhragmen { iterations: 10 }
|
||||
solver: Solver::SeqPhragmen { iterations: 10 },
|
||||
submission_strategy: SubmissionStrategy::IfLeading,
|
||||
}),
|
||||
}
|
||||
);
|
||||
@@ -727,4 +783,16 @@ mod tests {
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn submission_strategy_from_str_works() {
|
||||
use std::str::FromStr;
|
||||
|
||||
assert_eq!(SubmissionStrategy::from_str("if-leading"), Ok(SubmissionStrategy::IfLeading));
|
||||
assert_eq!(SubmissionStrategy::from_str("always"), Ok(SubmissionStrategy::Always));
|
||||
assert_eq!(
|
||||
SubmissionStrategy::from_str(" percent-better 99 "),
|
||||
Ok(SubmissionStrategy::ClaimBetterThan(Perbill::from_percent(99)))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,12 +16,16 @@
|
||||
|
||||
//! The monitor command.
|
||||
|
||||
use crate::{prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient};
|
||||
use crate::{
|
||||
prelude::*, rpc::*, signer::Signer, Error, MonitorConfig, SharedRpcClient, SubmissionStrategy,
|
||||
};
|
||||
use codec::Encode;
|
||||
use jsonrpsee::core::Error as RpcError;
|
||||
use sc_transaction_pool_api::TransactionStatus;
|
||||
use sp_core::storage::StorageKey;
|
||||
use sp_runtime::Perbill;
|
||||
use tokio::sync::mpsc;
|
||||
use EPM::{signed::SubmissionIndicesOf, SignedSubmissionOf};
|
||||
|
||||
/// Ensure that now is the signed phase.
|
||||
async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>(
|
||||
@@ -43,21 +47,70 @@ async fn ensure_signed_phase<T: EPM::Config, B: BlockT<Hash = Hash>>(
|
||||
}
|
||||
|
||||
/// Ensure that our current `us` have not submitted anything previously.
|
||||
async fn ensure_no_previous_solution<
|
||||
T: EPM::Config + frame_system::Config<AccountId = AccountId>,
|
||||
B: BlockT,
|
||||
>(
|
||||
ext: &mut Ext,
|
||||
async fn ensure_no_previous_solution<T, B>(
|
||||
rpc: &SharedRpcClient,
|
||||
at: Hash,
|
||||
us: &AccountId,
|
||||
) -> Result<(), Error<T>> {
|
||||
use EPM::signed::SignedSubmissions;
|
||||
ext.execute_with(|| {
|
||||
if <SignedSubmissions<T>>::get().iter().any(|ss| &ss.who == us) {
|
||||
Err(Error::AlreadySubmitted)
|
||||
} else {
|
||||
Ok(())
|
||||
) -> Result<(), Error<T>>
|
||||
where
|
||||
T: EPM::Config + frame_system::Config<AccountId = AccountId, Hash = Hash>,
|
||||
B: BlockT,
|
||||
{
|
||||
let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec());
|
||||
|
||||
let indices: SubmissionIndicesOf<T> = rpc
|
||||
.get_storage_and_decode(&indices_key, Some(at))
|
||||
.await
|
||||
.map_err::<Error<T>, _>(Into::into)?
|
||||
.unwrap_or_default();
|
||||
|
||||
for (_score, idx) in indices {
|
||||
let key = StorageKey(EPM::SignedSubmissionsMap::<T>::hashed_key_for(idx));
|
||||
|
||||
if let Some(submission) = rpc
|
||||
.get_storage_and_decode::<SignedSubmissionOf<T>>(&key, Some(at))
|
||||
.await
|
||||
.map_err::<Error<T>, _>(Into::into)?
|
||||
{
|
||||
if &submission.who == us {
|
||||
return Err(Error::AlreadySubmitted)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Reads all current solutions and checks the scores according to the `SubmissionStrategy`.
|
||||
async fn ensure_no_better_solution<T: EPM::Config, B: BlockT>(
|
||||
rpc: &SharedRpcClient,
|
||||
at: Hash,
|
||||
score: sp_npos_elections::ElectionScore,
|
||||
strategy: SubmissionStrategy,
|
||||
) -> Result<(), Error<T>> {
|
||||
let epsilon = match strategy {
|
||||
// don't care about current scores.
|
||||
SubmissionStrategy::Always => return Ok(()),
|
||||
SubmissionStrategy::IfLeading => Perbill::zero(),
|
||||
SubmissionStrategy::ClaimBetterThan(epsilon) => epsilon,
|
||||
};
|
||||
|
||||
let indices_key = StorageKey(EPM::SignedSubmissionIndices::<T>::hashed_key().to_vec());
|
||||
|
||||
let indices: SubmissionIndicesOf<T> = rpc
|
||||
.get_storage_and_decode(&indices_key, Some(at))
|
||||
.await
|
||||
.map_err::<Error<T>, _>(Into::into)?
|
||||
.unwrap_or_default();
|
||||
|
||||
// BTreeMap is ordered, take last to get the max score.
|
||||
if let Some(curr_max_score) = indices.into_iter().last().map(|(s, _)| s) {
|
||||
if !score.strict_threshold_better(curr_max_score, epsilon) {
|
||||
return Err(Error::StrategyNotSatisfied)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
|
||||
@@ -131,39 +184,52 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
|
||||
config: MonitorConfig,
|
||||
) {
|
||||
|
||||
async fn flatten<T>(
|
||||
handle: tokio::task::JoinHandle<Result<T, StakingMinerError>>
|
||||
) -> Result<T, StakingMinerError> {
|
||||
match handle.await {
|
||||
Ok(Ok(result)) => Ok(result),
|
||||
Ok(Err(err)) => Err(err),
|
||||
Err(err) => panic!("tokio spawn task failed; kill task: {:?}", err),
|
||||
}
|
||||
}
|
||||
|
||||
let hash = at.hash();
|
||||
log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash);
|
||||
|
||||
// if the runtime version has changed, terminate.
|
||||
// block on this because if this fails there is no way to recover from
|
||||
// that error i.e, upgrade/downgrade required.
|
||||
if let Err(err) = crate::check_versions::<Runtime>(&rpc).await {
|
||||
let _ = tx.send(err.into());
|
||||
return;
|
||||
}
|
||||
|
||||
// we prefer doing this check before fetching anything into a remote-ext.
|
||||
if ensure_signed_phase::<Runtime, Block>(&rpc, hash).await.is_err() {
|
||||
log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all.");
|
||||
let rpc1 = rpc.clone();
|
||||
let rpc2 = rpc.clone();
|
||||
let account = signer.account.clone();
|
||||
|
||||
let signed_phase_fut = tokio::spawn(async move {
|
||||
ensure_signed_phase::<Runtime, Block>(&rpc1, hash).await
|
||||
});
|
||||
|
||||
let no_prev_sol_fut = tokio::spawn(async move {
|
||||
ensure_no_previous_solution::<Runtime, Block>(&rpc2, hash, &account).await
|
||||
});
|
||||
|
||||
// Run the calls in parallel and return once all has completed or any failed.
|
||||
if let Err(err) = tokio::try_join!(flatten(signed_phase_fut), flatten(no_prev_sol_fut)) {
|
||||
log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err);
|
||||
return;
|
||||
}
|
||||
|
||||
// grab an externalities without staking, just the election snapshot.
|
||||
let mut ext = match crate::create_election_ext::<Runtime, Block>(
|
||||
rpc.clone(),
|
||||
Some(hash),
|
||||
vec![],
|
||||
).await {
|
||||
let mut ext = match crate::create_election_ext::<Runtime, Block>(rpc.clone(), Some(hash), vec![]).await {
|
||||
Ok(ext) => ext,
|
||||
Err(err) => {
|
||||
let _ = tx.send(err);
|
||||
log::debug!(target: LOG_TARGET, "Skipping block {}; {}", at.number, err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if ensure_no_previous_solution::<Runtime, Block>(&mut ext, &signer.account).await.is_err() {
|
||||
log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping.");
|
||||
return;
|
||||
}
|
||||
|
||||
// mine a solution, and run feasibility check on it as well.
|
||||
let raw_solution = match crate::mine_with::<Runtime>(&config.solver, &mut ext, true) {
|
||||
Ok(r) => r,
|
||||
@@ -173,7 +239,8 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
|
||||
}
|
||||
};
|
||||
|
||||
log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score);
|
||||
let score = raw_solution.score;
|
||||
log::info!(target: LOG_TARGET, "mined solution with {:?}", score);
|
||||
|
||||
let nonce = match crate::get_account_info::<Runtime>(&rpc, &signer.account, Some(hash)).await {
|
||||
Ok(maybe_account) => {
|
||||
@@ -200,6 +267,25 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
|
||||
let extrinsic = ext.execute_with(|| create_uxt(raw_solution, signer.clone(), nonce, tip, era));
|
||||
let bytes = sp_core::Bytes(extrinsic.encode());
|
||||
|
||||
let rpc1 = rpc.clone();
|
||||
let rpc2 = rpc.clone();
|
||||
|
||||
let ensure_no_better_fut = tokio::spawn(async move {
|
||||
ensure_no_better_solution::<Runtime, Block>(&rpc1, hash, score, config.submission_strategy).await
|
||||
});
|
||||
|
||||
let ensure_signed_phase_fut = tokio::spawn(async move {
|
||||
ensure_signed_phase::<Runtime, Block>(&rpc2, hash).await
|
||||
});
|
||||
|
||||
// Run the calls in parallel and return once all has completed or any failed.
|
||||
if tokio::try_join!(
|
||||
flatten(ensure_no_better_fut),
|
||||
flatten(ensure_signed_phase_fut),
|
||||
).is_err() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut tx_subscription = match rpc.watch_extrinsic(&bytes).await {
|
||||
Ok(sub) => sub,
|
||||
Err(RpcError::RestartNeeded(e)) => {
|
||||
|
||||
Reference in New Issue
Block a user