Ensure election offchain workers don't overlap (#8828)

* Initial version, well tested, should work fine.

* Add one last log line

* Update frame/election-provider-multi-phase/src/unsigned.rs

Co-authored-by: Gavin Wood <gavin@parity.io>

* Update frame/election-provider-multi-phase/src/unsigned.rs

Co-authored-by: Guillaume Thiolliere <gui.thiolliere@gmail.com>

* Update frame/election-provider-multi-phase/src/unsigned.rs

Co-authored-by: Guillaume Thiolliere <gui.thiolliere@gmail.com>

* Fix a few more things

* fix build

* rewrite the whole thing with a proper lock

* clean

* clean some nits

* Add unit tests.

* Update primitives/runtime/src/offchain/storage_lock.rs

Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Fix test

* Fix tests

Co-authored-by: Gavin Wood <gavin@parity.io>
Co-authored-by: Guillaume Thiolliere <gui.thiolliere@gmail.com>
Co-authored-by: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Kian Paimani
2021-05-18 14:22:06 +02:00
committed by GitHub
parent bee5c2dd71
commit b5f23bfd1c
5 changed files with 232 additions and 112 deletions
+1 -1
View File
@@ -284,7 +284,7 @@ pub fn testnet_genesis(
}).collect::<Vec<_>>(),
},
pallet_staking: StakingConfig {
validator_count: initial_authorities.len() as u32 * 2,
validator_count: initial_authorities.len() as u32,
minimum_validator_count: initial_authorities.len() as u32,
invulnerables: initial_authorities.iter().map(|x| x.0.clone()).collect(),
slash_reward_fraction: Perbill::from_percent(10),
@@ -22,6 +22,7 @@ frame-system = { version = "3.0.0", default-features = false, path = "../system"
sp-io = { version = "3.0.0", default-features = false, path = "../../primitives/io" }
sp-std = { version = "3.0.0", default-features = false, path = "../../primitives/std" }
sp-core = { version = "3.0.0", default-features = false, path = "../../primitives/core" }
sp-runtime = { version = "3.0.0", default-features = false, path = "../../primitives/runtime" }
sp-npos-elections = { version = "3.0.0", default-features = false, path = "../../primitives/npos-elections" }
sp-arithmetic = { version = "3.0.0", default-features = false, path = "../../primitives/arithmetic" }
@@ -56,6 +57,7 @@ std = [
"sp-io/std",
"sp-std/std",
"sp-core/std",
"sp-runtime/std",
"sp-npos-elections/std",
"sp-arithmetic/std",
@@ -653,38 +653,24 @@ pub mod pallet {
}
fn offchain_worker(now: T::BlockNumber) {
match Self::current_phase() {
Phase::Unsigned((true, opened)) if opened == now => {
// mine a new solution, cache it, and attempt to submit it
let initial_output = Self::try_acquire_offchain_lock(now)
.and_then(|_| Self::mine_check_save_submit());
log!(info, "initial OCW output at {:?}: {:?}", now, initial_output);
use sp_runtime::offchain::storage_lock::{StorageLock, BlockAndTime};
// create a lock with the maximum deadline of number of blocks in the unsigned phase.
// This should only come useful in an **abrupt** termination of execution, otherwise the
// guard will be dropped upon successful execution.
let mut lock = StorageLock::<BlockAndTime<frame_system::Pallet::<T>>>::with_block_deadline(
unsigned::OFFCHAIN_LOCK,
T::UnsignedPhase::get().saturated_into(),
);
match lock.try_lock() {
Ok(_guard) => {
Self::do_synchronized_offchain_worker(now);
},
Err(deadline) => {
log!(debug, "offchain worker lock not released, deadline is {:?}", deadline);
}
Phase::Unsigned((true, opened)) if opened < now => {
// keep trying to submit solutions. worst case, we note that the stored solution
// is better than our cached/computed one, and decide not to submit after all.
//
// the offchain_lock prevents us from spamming submissions too often.
let resubmit_output = Self::try_acquire_offchain_lock(now)
.and_then(|_| Self::restore_or_compute_then_maybe_submit());
log!(info, "resubmit OCW output at {:?}: {:?}", now, resubmit_output);
}
_ => {}
}
// after election finalization, clear OCW solution storage
if <frame_system::Pallet<T>>::events()
.into_iter()
.filter_map(|event_record| {
let local_event = <T as Config>::Event::from(event_record.event);
local_event.try_into().ok()
})
.find(|event| {
matches!(event, Event::ElectionFinalized(_))
})
.is_some()
{
unsigned::kill_ocw_solution::<T>();
}
};
}
fn integrity_test() {
@@ -929,6 +915,44 @@ pub mod pallet {
}
impl<T: Config> Pallet<T> {
/// Internal logic of the offchain worker, to be executed only when the offchain lock is
/// acquired with success.
fn do_synchronized_offchain_worker(now: T::BlockNumber) {
log!(trace, "lock for offchain worker acquired.");
match Self::current_phase() {
Phase::Unsigned((true, opened)) if opened == now => {
// mine a new solution, cache it, and attempt to submit it
let initial_output = Self::ensure_offchain_repeat_frequency(now).and_then(|_| {
Self::mine_check_save_submit()
});
log!(debug, "initial offchain thread output: {:?}", initial_output);
}
Phase::Unsigned((true, opened)) if opened < now => {
// try and resubmit the cached solution, and recompute ONLY if it is not
// feasible.
let resubmit_output = Self::ensure_offchain_repeat_frequency(now).and_then(|_| {
Self::restore_or_compute_then_maybe_submit()
});
log!(debug, "resubmit offchain thread output: {:?}", resubmit_output);
}
_ => {}
}
// after election finalization, clear OCW solution storage.
if <frame_system::Pallet<T>>::events()
.into_iter()
.filter_map(|event_record| {
let local_event = <T as Config>::Event::from(event_record.event);
local_event.try_into().ok()
})
.any(|event| {
matches!(event, Event::ElectionFinalized(_))
})
{
unsigned::kill_ocw_solution::<T>();
}
}
/// Logic for [`<Pallet as Hooks>::on_initialize`] when signed phase is being opened.
///
/// This is decoupled for easy weight calculation.
@@ -15,26 +15,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! The unsigned phase implementation.
//! The unsigned phase, and its miner.
use crate::{
helpers, Call, CompactAccuracyOf, CompactOf, Config,
ElectionCompute, Error, FeasibilityError, Pallet, RawSolution, ReadySolution, RoundSnapshot,
SolutionOrSnapshotSize, Weight, WeightInfo,
helpers, Call, CompactAccuracyOf, CompactOf, Config, ElectionCompute, Error, FeasibilityError,
Pallet, RawSolution, ReadySolution, RoundSnapshot, SolutionOrSnapshotSize, Weight, WeightInfo,
};
use codec::{Encode, Decode};
use frame_support::{dispatch::DispatchResult, ensure, traits::Get};
use frame_system::offchain::SubmitTransaction;
use sp_arithmetic::Perbill;
use sp_npos_elections::{
CompactSolution, ElectionResult, ElectionScore, assignment_ratio_to_staked_normalized,
CompactSolution, ElectionResult, assignment_ratio_to_staked_normalized,
assignment_staked_to_ratio_normalized, is_score_better, seq_phragmen,
};
use sp_runtime::{offchain::storage::StorageValueRef, traits::TrailingZeroInput, SaturatedConversion};
use sp_std::{cmp::Ordering, convert::TryFrom, vec::Vec};
/// Storage key used to store the persistent offchain worker status.
pub(crate) const OFFCHAIN_LOCK: &[u8] = b"parity/multi-phase-unsigned-election";
/// Storage key used to store the last block number at which offchain worker ran.
pub(crate) const OFFCHAIN_LAST_BLOCK: &[u8] = b"parity/multi-phase-unsigned-election";
/// Storage key used to store the offchain worker running status.
pub(crate) const OFFCHAIN_LOCK: &[u8] = b"parity/multi-phase-unsigned-election/lock";
/// Storage key used to cache the solution `call`.
pub(crate) const OFFCHAIN_CACHED_CALL: &[u8] = b"parity/multi-phase-unsigned-election/call";
@@ -72,8 +73,6 @@ pub enum MinerError {
Lock(&'static str),
/// Cannot restore a solution that was not stored.
NoStoredSolution,
/// Cached solution does not match the current round.
SolutionOutOfDate,
/// Cached solution is not a `submit_unsigned` call.
SolutionCallInvalid,
/// Failed to store a solution.
@@ -96,15 +95,16 @@ impl From<FeasibilityError> for MinerError {
/// Save a given call into OCW storage.
fn save_solution<T: Config>(call: &Call<T>) -> Result<(), MinerError> {
log!(debug, "saving a call to the offchain storage.");
let storage = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL);
match storage.mutate::<_, (), _>(|_| Ok(call.clone())) {
Ok(Ok(_)) => Ok(()),
Ok(Err(_)) => Err(MinerError::FailedToStoreSolution),
Err(_) => {
// this branch should be unreachable according to the definition of `StorageValueRef::mutate`:
// that function should only ever `Err` if the closure we pass it return an error.
// however, for safety in case the definition changes, we do not optimize the branch away
// or panic.
// this branch should be unreachable according to the definition of
// `StorageValueRef::mutate`: that function should only ever `Err` if the closure we
// pass it returns an error. however, for safety in case the definition changes, we do
// not optimize the branch away or panic.
Err(MinerError::FailedToStoreSolution)
},
}
@@ -120,10 +120,20 @@ fn restore_solution<T: Config>() -> Result<Call<T>, MinerError> {
/// Clear a saved solution from OCW storage.
pub(super) fn kill_ocw_solution<T: Config>() {
log!(debug, "clearing offchain call cache storage.");
let mut storage = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL);
storage.clear();
}
/// Clear the offchain repeat storage.
///
/// After calling this, the next offchain worker is guaranteed to work, with respect to the
/// frequency repeat.
fn clear_offchain_repeat_frequency() {
let mut last_block = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);
last_block.clear();
}
/// `true` when OCW storage contains a solution
///
/// More precise than `restore_solution::<T>().is_ok()`; that invocation will return `false`
@@ -137,54 +147,59 @@ impl<T: Config> Pallet<T> {
/// Attempt to restore a solution from cache. Otherwise, compute it fresh. Either way, submit
/// if our call's score is greater than that of the cached solution.
pub fn restore_or_compute_then_maybe_submit() -> Result<(), MinerError> {
log!(
debug,
"OCW attempting to restore or compute an unsigned solution for the current election"
);
log!(debug,"miner attempting to restore or compute an unsigned solution.");
let call = restore_solution::<T>()
.and_then(|call| {
// ensure the cached call is still current before submitting
if let Call::submit_unsigned(solution, _) = &call {
// prevent errors arising from state changes in a forkful chain
Self::basic_checks(solution, "restored")?;
Ok(call)
} else {
Err(MinerError::SolutionCallInvalid)
}
})
.or_else::<MinerError, _>(|_| {
// if not present or cache invalidated, regenerate
let (call, _) = Self::mine_checked_call()?;
save_solution(&call)?;
.and_then(|call| {
// ensure the cached call is still current before submitting
if let Call::submit_unsigned(solution, _) = &call {
// prevent errors arising from state changes in a forkful chain
Self::basic_checks(solution, "restored")?;
Ok(call)
})?;
} else {
Err(MinerError::SolutionCallInvalid)
}
}).or_else::<MinerError, _>(|error| {
log!(debug, "restoring solution failed due to {:?}", error);
match error {
MinerError::NoStoredSolution => {
log!(trace, "mining a new solution.");
// if not present or cache invalidated due to feasibility, regenerate.
// note that failing `Feasibility` can only mean that the solution was
// computed over a snapshot that has changed due to a fork.
let call = Self::mine_checked_call()?;
save_solution(&call)?;
Ok(call)
}
MinerError::Feasibility(_) => {
log!(trace, "wiping infeasible solution.");
// kill the infeasible solution, hopefully in the next runs (whenever they
// may be) we mine a new one.
kill_ocw_solution::<T>();
clear_offchain_repeat_frequency();
Err(error)
},
_ => {
// nothing to do. Return the error as-is.
Err(error)
}
}
})?;
// the runtime will catch it and reject the transaction if the phase is wrong, but it's
// cheap and easy to check it here to ease the workload on the runtime, so:
if !Self::current_phase().is_unsigned_open() {
// don't bother submitting; it's not an error, we're just too late.
return Ok(());
}
// in case submission fails for any reason, `submit_call` kills the stored solution
Self::submit_call(call)
}
/// Mine a new solution, cache it, and submit it back to the chain as an unsigned transaction.
pub fn mine_check_save_submit() -> Result<(), MinerError> {
log!(
debug,
"OCW attempting to compute an unsigned solution for the current election"
);
log!(debug, "miner attempting to compute an unsigned solution.");
let (call, _) = Self::mine_checked_call()?;
let call = Self::mine_checked_call()?;
save_solution(&call)?;
Self::submit_call(call)
}
/// Mine a new solution as a call. Performs all checks.
fn mine_checked_call() -> Result<(Call<T>, ElectionScore), MinerError> {
fn mine_checked_call() -> Result<Call<T>, MinerError> {
let iters = Self::get_balancing_iters();
// get the solution, with a load of checks to ensure if submitted, IT IS ABSOLUTELY VALID.
let (raw_solution, witness) = Self::mine_and_check(iters)?;
@@ -194,38 +209,35 @@ impl<T: Config> Pallet<T> {
log!(
debug,
"OCW mined a solution with score {:?} and size {}",
"mined a solution with score {:?} and size {}",
score,
call.using_encoded(|b| b.len())
);
Ok((call, score))
Ok(call)
}
fn submit_call(call: Call<T>) -> Result<(), MinerError> {
log!(
debug,
"OCW submitting a solution as an unsigned transaction",
);
log!(debug, "miner submitting a solution as an unsigned transaction");
SubmitTransaction::<T, Call<T>>::submit_unsigned_transaction(call.into())
.map_err(|_| {
kill_ocw_solution::<T>();
MinerError::PoolSubmissionFailed
})
.map_err(|_| MinerError::PoolSubmissionFailed)
}
// perform basic checks of a solution's validity
//
// Performance: note that it internally clones the provided solution.
fn basic_checks(raw_solution: &RawSolution<CompactOf<T>>, solution_type: &str) -> Result<(), MinerError> {
fn basic_checks(
raw_solution: &RawSolution<CompactOf<T>>,
solution_type: &str,
) -> Result<(), MinerError> {
Self::unsigned_pre_dispatch_checks(raw_solution).map_err(|err| {
log!(warn, "pre-dispatch checks fialed for {} solution: {:?}", solution_type, err);
log!(debug, "pre-dispatch checks failed for {} solution: {:?}", solution_type, err);
MinerError::PreDispatchChecksFailed
})?;
Self::feasibility_check(raw_solution.clone(), ElectionCompute::Unsigned).map_err(|err| {
log!(warn, "feasibility check failed for {} solution: {:?}", solution_type, err);
log!(debug, "feasibility check failed for {} solution: {:?}", solution_type, err);
err
})?;
@@ -561,18 +573,18 @@ impl<T: Config> Pallet<T> {
/// Checks if an execution of the offchain worker is permitted at the given block number, or
/// not.
///
/// This essentially makes sure that we don't run on previous blocks in case of a re-org, and we
/// don't run twice within a window of length `threshold`.
/// This makes sure that
/// 1. we don't run on previous blocks in case of a re-org
/// 2. we don't run twice within a window of length `T::OffchainRepeat`.
///
/// Returns `Ok(())` if offchain worker should happen, `Err(reason)` otherwise.
pub(crate) fn try_acquire_offchain_lock(
now: T::BlockNumber,
) -> Result<(), MinerError> {
/// Returns `Ok(())` if offchain worker limit is respected, `Err(reason)` otherwise. If `Ok()`
/// is returned, `now` is written in storage and will be used in further calls as the baseline.
pub(crate) fn ensure_offchain_repeat_frequency(now: T::BlockNumber) -> Result<(), MinerError> {
let threshold = T::OffchainRepeat::get();
let storage = StorageValueRef::persistent(&OFFCHAIN_LOCK);
let last_block = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);
let mutate_stat =
storage.mutate::<_, &'static str, _>(|maybe_head: Option<Option<T::BlockNumber>>| {
let mutate_stat = last_block.mutate::<_, &'static str, _>(
|maybe_head: Option<Option<T::BlockNumber>>| {
match maybe_head {
Some(Some(head)) if now < head => Err("fork."),
Some(Some(head)) if now >= head && now <= head + threshold => {
@@ -587,7 +599,8 @@ impl<T: Config> Pallet<T> {
Ok(now)
}
}
});
},
);
match mutate_stat {
// all good
@@ -731,11 +744,13 @@ mod tests {
mock::{
Call as OuterCall, ExtBuilder, Extrinsic, MinerMaxWeight, MultiPhase, Origin, Runtime,
TestCompact, TrimHelpers, roll_to, roll_to_with_ocw, trim_helpers, witness,
UnsignedPhase, BlockNumber, System,
},
};
use frame_benchmarking::Zero;
use frame_support::{assert_noop, assert_ok, dispatch::Dispatchable, traits::OffchainWorker};
use sp_npos_elections::IndexAssignment;
use sp_runtime::offchain::storage_lock::{StorageLock, BlockAndTime};
use sp_runtime::{traits::ValidateUnsigned, PerU16};
type Assignment = crate::unsigned::Assignment<Runtime>;
@@ -1052,7 +1067,7 @@ mod tests {
}
#[test]
fn ocw_check_prevent_duplicate() {
fn ocw_lock_prevents_frequent_execution() {
let (mut ext, _) = ExtBuilder::default().build_offchainify(0);
ext.execute_with(|| {
let offchain_repeat = <Runtime as Config>::OffchainRepeat::get();
@@ -1061,21 +1076,88 @@ mod tests {
assert!(MultiPhase::current_phase().is_unsigned());
// first execution -- okay.
assert!(MultiPhase::try_acquire_offchain_lock(25).is_ok());
assert!(MultiPhase::ensure_offchain_repeat_frequency(25).is_ok());
// next block: rejected.
assert_noop!(MultiPhase::try_acquire_offchain_lock(26), MinerError::Lock("recently executed."));
assert_noop!(
MultiPhase::ensure_offchain_repeat_frequency(26),
MinerError::Lock("recently executed.")
);
// allowed after `OFFCHAIN_REPEAT`
assert!(MultiPhase::try_acquire_offchain_lock((26 + offchain_repeat).into()).is_ok());
assert!(
MultiPhase::ensure_offchain_repeat_frequency((26 + offchain_repeat).into()).is_ok()
);
// a fork like situation: re-execute last 3.
assert!(MultiPhase::try_acquire_offchain_lock((26 + offchain_repeat - 3).into()).is_err());
assert!(MultiPhase::try_acquire_offchain_lock((26 + offchain_repeat - 2).into()).is_err());
assert!(MultiPhase::try_acquire_offchain_lock((26 + offchain_repeat - 1).into()).is_err());
assert!(MultiPhase::ensure_offchain_repeat_frequency(
(26 + offchain_repeat - 3).into()
)
.is_err());
assert!(MultiPhase::ensure_offchain_repeat_frequency(
(26 + offchain_repeat - 2).into()
)
.is_err());
assert!(MultiPhase::ensure_offchain_repeat_frequency(
(26 + offchain_repeat - 1).into()
)
.is_err());
})
}
#[test]
fn ocw_lock_released_after_successful_execution() {
// first, ensure that a successful execution releases the lock
let (mut ext, pool) = ExtBuilder::default().build_offchainify(0);
ext.execute_with(|| {
let guard = StorageValueRef::persistent(&OFFCHAIN_LOCK);
let last_block = StorageValueRef::persistent(OFFCHAIN_LAST_BLOCK);
roll_to(25);
assert!(MultiPhase::current_phase().is_unsigned());
// initially, the lock is not set.
assert!(guard.get::<bool>().is_none());
// a successful a-z execution.
MultiPhase::offchain_worker(25);
assert_eq!(pool.read().transactions.len(), 1);
// afterwards, the lock is not set either..
assert!(guard.get::<bool>().is_none());
assert_eq!(last_block.get::<BlockNumber>().unwrap().unwrap(), 25);
});
}
#[test]
fn ocw_lock_prevents_overlapping_execution() {
// ensure that if the guard is in hold, a new execution is not allowed.
let (mut ext, pool) = ExtBuilder::default().build_offchainify(0);
ext.execute_with(|| {
roll_to(25);
assert!(MultiPhase::current_phase().is_unsigned());
// artificially set the value, as if another thread is mid-way.
let mut lock = StorageLock::<BlockAndTime<System>>::with_block_deadline(
OFFCHAIN_LOCK,
UnsignedPhase::get().saturated_into(),
);
let guard = lock.lock();
// nothing submitted.
MultiPhase::offchain_worker(25);
assert_eq!(pool.read().transactions.len(), 0);
MultiPhase::offchain_worker(26);
assert_eq!(pool.read().transactions.len(), 0);
drop(guard);
// 🎉 !
MultiPhase::offchain_worker(25);
assert_eq!(pool.read().transactions.len(), 1);
});
}
#[test]
fn ocw_only_runs_when_unsigned_open_now() {
let (mut ext, pool) = ExtBuilder::default().build_offchainify(0);
@@ -1085,7 +1167,7 @@ mod tests {
// we must clear the offchain storage to ensure the offchain execution check doesn't get
// in the way.
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LOCK);
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);
MultiPhase::offchain_worker(24);
assert!(pool.read().transactions.len().is_zero());
@@ -1112,7 +1194,7 @@ mod tests {
// we must clear the offchain storage to ensure the offchain execution check doesn't get
// in the way.
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LOCK);
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);
storage.clear();
assert!(!ocw_solution_exists::<Runtime>(), "no solution should be present before we mine one");
@@ -1143,7 +1225,7 @@ mod tests {
// we must clear the offchain storage to ensure the offchain execution check doesn't get
// in the way.
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LOCK);
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);
MultiPhase::offchain_worker(block_plus(-1));
assert!(pool.read().transactions.len().is_zero());
@@ -1181,7 +1263,7 @@ mod tests {
// we must clear the offchain storage to ensure the offchain execution check doesn't get
// in the way.
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LOCK);
let mut storage = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);
MultiPhase::offchain_worker(block_plus(-1));
assert!(pool.read().transactions.len().is_zero());
@@ -66,6 +66,7 @@ use crate::traits::AtLeast32BitUnsigned;
use codec::{Codec, Decode, Encode};
use sp_core::offchain::{Duration, Timestamp};
use sp_io::offchain;
use sp_std::fmt;
/// Default expiry duration for time based locks in milliseconds.
const STORAGE_LOCK_DEFAULT_EXPIRY_DURATION: Duration = Duration::from_millis(20_000);
@@ -173,6 +174,17 @@ impl<B: BlockNumberProvider> Default for BlockAndTimeDeadline<B> {
}
}
impl<B: BlockNumberProvider> fmt::Debug for BlockAndTimeDeadline<B>
where <B as BlockNumberProvider>::BlockNumber: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockAndTimeDeadline")
.field("block_number", &self.block_number)
.field("timestamp", &self.timestamp)
.finish()
}
}
/// Lockable based on block number and timestamp.
///
/// Expiration is defined if both, block number _and_ timestamp