mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 13:21:10 +00:00
Result<Option<>> rather than Option<Option<>> (#9119)
* Clearer API to code against.
This commit is contained in:
@@ -29,7 +29,10 @@ use sp_npos_elections::{
|
||||
CompactSolution, ElectionResult, assignment_ratio_to_staked_normalized,
|
||||
assignment_staked_to_ratio_normalized, is_score_better, seq_phragmen,
|
||||
};
|
||||
use sp_runtime::{offchain::storage::StorageValueRef, traits::TrailingZeroInput, SaturatedConversion};
|
||||
use sp_runtime::{
|
||||
offchain::storage::{MutateStorageError, StorageValueRef},
|
||||
traits::TrailingZeroInput, SaturatedConversion
|
||||
};
|
||||
use sp_std::{cmp::Ordering, convert::TryFrom, vec::Vec};
|
||||
|
||||
/// Storage key used to store the last block number at which offchain worker ran.
|
||||
@@ -98,9 +101,9 @@ fn save_solution<T: Config>(call: &Call<T>) -> Result<(), MinerError> {
|
||||
log!(debug, "saving a call to the offchain storage.");
|
||||
let storage = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL);
|
||||
match storage.mutate::<_, (), _>(|_| Ok(call.clone())) {
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(Err(_)) => Err(MinerError::FailedToStoreSolution),
|
||||
Err(_) => {
|
||||
Ok(_) => Ok(()),
|
||||
Err(MutateStorageError::ConcurrentModification(_)) => Err(MinerError::FailedToStoreSolution),
|
||||
Err(MutateStorageError::ValueFunctionFailed(_)) => {
|
||||
// this branch should be unreachable according to the definition of
|
||||
// `StorageValueRef::mutate`: that function should only ever `Err` if the closure we
|
||||
// pass it returns an error. however, for safety in case the definition changes, we do
|
||||
@@ -114,6 +117,7 @@ fn save_solution<T: Config>(call: &Call<T>) -> Result<(), MinerError> {
|
||||
fn restore_solution<T: Config>() -> Result<Call<T>, MinerError> {
|
||||
StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL)
|
||||
.get()
|
||||
.ok()
|
||||
.flatten()
|
||||
.ok_or(MinerError::NoStoredSolution)
|
||||
}
|
||||
@@ -135,12 +139,9 @@ fn clear_offchain_repeat_frequency() {
|
||||
}
|
||||
|
||||
/// `true` when OCW storage contains a solution
|
||||
///
|
||||
/// More precise than `restore_solution::<T>().is_ok()`; that invocation will return `false`
|
||||
/// if a solution exists but cannot be decoded, whereas this just checks whether an item is present.
|
||||
#[cfg(test)]
|
||||
fn ocw_solution_exists<T: Config>() -> bool {
|
||||
StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL).get::<Call<T>>().is_some()
|
||||
matches!(StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL).get::<Call<T>>(), Ok(Some(_)))
|
||||
}
|
||||
|
||||
impl<T: Config> Pallet<T> {
|
||||
@@ -584,13 +585,13 @@ impl<T: Config> Pallet<T> {
|
||||
let last_block = StorageValueRef::persistent(&OFFCHAIN_LAST_BLOCK);
|
||||
|
||||
let mutate_stat = last_block.mutate::<_, &'static str, _>(
|
||||
|maybe_head: Option<Option<T::BlockNumber>>| {
|
||||
|maybe_head: Result<Option<T::BlockNumber>, _>| {
|
||||
match maybe_head {
|
||||
Some(Some(head)) if now < head => Err("fork."),
|
||||
Some(Some(head)) if now >= head && now <= head + threshold => {
|
||||
Ok(Some(head)) if now < head => Err("fork."),
|
||||
Ok(Some(head)) if now >= head && now <= head + threshold => {
|
||||
Err("recently executed.")
|
||||
}
|
||||
Some(Some(head)) if now > head + threshold => {
|
||||
Ok(Some(head)) if now > head + threshold => {
|
||||
// we can run again now. Write the new head.
|
||||
Ok(now)
|
||||
}
|
||||
@@ -604,11 +605,12 @@ impl<T: Config> Pallet<T> {
|
||||
|
||||
match mutate_stat {
|
||||
// all good
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(_) => Ok(()),
|
||||
// failed to write.
|
||||
Ok(Err(_)) => Err(MinerError::Lock("failed to write to offchain db.")),
|
||||
Err(MutateStorageError::ConcurrentModification(_)) =>
|
||||
Err(MinerError::Lock("failed to write to offchain db (concurrent modification).")),
|
||||
// fork etc.
|
||||
Err(why) => Err(MinerError::Lock(why)),
|
||||
Err(MutateStorageError::ValueFunctionFailed(why)) => Err(MinerError::Lock(why)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1117,15 +1119,15 @@ mod tests {
|
||||
assert!(MultiPhase::current_phase().is_unsigned());
|
||||
|
||||
// initially, the lock is not set.
|
||||
assert!(guard.get::<bool>().is_none());
|
||||
assert!(guard.get::<bool>().unwrap().is_none());
|
||||
|
||||
// a successful a-z execution.
|
||||
MultiPhase::offchain_worker(25);
|
||||
assert_eq!(pool.read().transactions.len(), 1);
|
||||
|
||||
// afterwards, the lock is not set either..
|
||||
assert!(guard.get::<bool>().is_none());
|
||||
assert_eq!(last_block.get::<BlockNumber>().unwrap().unwrap(), 25);
|
||||
assert!(guard.get::<bool>().unwrap().is_none());
|
||||
assert_eq!(last_block.get::<BlockNumber>().unwrap(), Some(25));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1280,7 +1282,7 @@ mod tests {
|
||||
// this ensures that when the resubmit window rolls around, we're ready to regenerate
|
||||
// from scratch if necessary
|
||||
let mut call_cache = StorageValueRef::persistent(&OFFCHAIN_CACHED_CALL);
|
||||
assert!(matches!(call_cache.get::<Call<Runtime>>(), Some(Some(_call))));
|
||||
assert!(matches!(call_cache.get::<Call<Runtime>>(), Ok(Some(_call))));
|
||||
call_cache.clear();
|
||||
|
||||
// attempts to resubmit the tx after the threshold has expired
|
||||
|
||||
@@ -53,7 +53,7 @@ use frame_support::traits::Get;
|
||||
use sp_core::crypto::KeyTypeId;
|
||||
use sp_runtime::{
|
||||
RuntimeDebug,
|
||||
offchain::{http, Duration, storage::StorageValueRef},
|
||||
offchain::{http, Duration, storage::{MutateStorageError, StorageRetrievalError, StorageValueRef}},
|
||||
traits::Zero,
|
||||
transaction_validity::{InvalidTransaction, ValidTransaction, TransactionValidity},
|
||||
};
|
||||
@@ -366,15 +366,11 @@ impl<T: Config> Pallet<T> {
|
||||
// low-level method of local storage API, which means that only one worker
|
||||
// will be able to "acquire a lock" and send a transaction if multiple workers
|
||||
// happen to be executed concurrently.
|
||||
let res = val.mutate(|last_send: Option<Option<T::BlockNumber>>| {
|
||||
// We match on the value decoded from the storage. The first `Option`
|
||||
// indicates if the value was present in the storage at all,
|
||||
// the second (inner) `Option` indicates if the value was succesfuly
|
||||
// decoded to expected type (`T::BlockNumber` in our case).
|
||||
let res = val.mutate(|last_send: Result<Option<T::BlockNumber>, StorageRetrievalError>| {
|
||||
match last_send {
|
||||
// If we already have a value in storage and the block number is recent enough
|
||||
// we avoid sending another transaction at this time.
|
||||
Some(Some(block)) if block_number < block + T::GracePeriod::get() => {
|
||||
Ok(Some(block)) if block_number < block + T::GracePeriod::get() => {
|
||||
Err(RECENTLY_SENT)
|
||||
},
|
||||
// In every other case we attempt to acquire the lock and send a transaction.
|
||||
@@ -390,7 +386,7 @@ impl<T: Config> Pallet<T> {
|
||||
// written to in the meantime.
|
||||
match res {
|
||||
// The value has been set correctly, which means we can safely send a transaction now.
|
||||
Ok(Ok(block_number)) => {
|
||||
Ok(block_number) => {
|
||||
// Depending if the block is even or odd we will send a `Signed` or `Unsigned`
|
||||
// transaction.
|
||||
// Note that this logic doesn't really guarantee that the transactions will be sent
|
||||
@@ -406,13 +402,13 @@ impl<T: Config> Pallet<T> {
|
||||
else { TransactionType::Raw }
|
||||
},
|
||||
// We are in the grace period, we should not send a transaction this time.
|
||||
Err(RECENTLY_SENT) => TransactionType::None,
|
||||
Err(MutateStorageError::ValueFunctionFailed(RECENTLY_SENT)) => TransactionType::None,
|
||||
// We wanted to send a transaction, but failed to write the block number (acquire a
|
||||
// lock). This indicates that another offchain worker that was running concurrently
|
||||
// most likely executed the same logic and succeeded at writing to storage.
|
||||
// Thus we don't really want to send the transaction, knowing that the other run
|
||||
// already did.
|
||||
Ok(Err(_)) => TransactionType::None,
|
||||
Err(MutateStorageError::ConcurrentModification(_)) => TransactionType::None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -80,7 +80,7 @@ use sp_core::offchain::OpaqueNetworkState;
|
||||
use sp_std::prelude::*;
|
||||
use sp_std::convert::TryInto;
|
||||
use sp_runtime::{
|
||||
offchain::storage::StorageValueRef,
|
||||
offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
|
||||
traits::{AtLeast32BitUnsigned, Convert, Saturating, TrailingZeroInput},
|
||||
Perbill, Permill, PerThing, RuntimeDebug, SaturatedConversion,
|
||||
};
|
||||
@@ -719,14 +719,15 @@ impl<T: Config> Pallet<T> {
|
||||
key
|
||||
};
|
||||
let storage = StorageValueRef::persistent(&key);
|
||||
let res = storage.mutate(|status: Option<Option<HeartbeatStatus<T::BlockNumber>>>| {
|
||||
let res = storage.mutate(
|
||||
|status: Result<Option<HeartbeatStatus<T::BlockNumber>>, StorageRetrievalError>| {
|
||||
// Check if there is already a lock for that particular block.
|
||||
// This means that the heartbeat has already been sent, and we are just waiting
|
||||
// for it to be included. However if it doesn't get included for INCLUDE_THRESHOLD
|
||||
// we will re-send it.
|
||||
match status {
|
||||
// we are still waiting for inclusion.
|
||||
Some(Some(status)) if status.is_recent(session_index, now) => {
|
||||
Ok(Some(status)) if status.is_recent(session_index, now) => {
|
||||
Err(OffchainErr::WaitingForInclusion(status.sent_at))
|
||||
},
|
||||
// attempt to set new status
|
||||
@@ -735,7 +736,10 @@ impl<T: Config> Pallet<T> {
|
||||
sent_at: now,
|
||||
}),
|
||||
}
|
||||
})?;
|
||||
});
|
||||
if let Err(MutateStorageError::ValueFunctionFailed(err)) = res {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let mut new_status = res.map_err(|_| OffchainErr::FailedToAcquireLock)?;
|
||||
|
||||
|
||||
@@ -25,7 +25,10 @@
|
||||
//! This is used in conjunction with [`ProvingTrie`](super::ProvingTrie) and
|
||||
//! the off-chain indexing API.
|
||||
|
||||
use sp_runtime::{offchain::storage::StorageValueRef, KeyTypeId};
|
||||
use sp_runtime::{
|
||||
offchain::storage::{MutateStorageError, StorageRetrievalError, StorageValueRef},
|
||||
KeyTypeId
|
||||
};
|
||||
use sp_session::MembershipProof;
|
||||
|
||||
use super::super::{Pallet as SessionModule, SessionIndex};
|
||||
@@ -49,6 +52,7 @@ impl<T: Config> ValidatorSet<T> {
|
||||
let derived_key = shared::derive_key(shared::PREFIX, session_index);
|
||||
StorageValueRef::persistent(derived_key.as_ref())
|
||||
.get::<Vec<(T::ValidatorId, T::FullIdentification)>>()
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|validator_set| Self { validator_set })
|
||||
}
|
||||
@@ -100,19 +104,19 @@ pub fn prove_session_membership<T: Config, D: AsRef<[u8]>>(
|
||||
pub fn prune_older_than<T: Config>(first_to_keep: SessionIndex) {
|
||||
let derived_key = shared::LAST_PRUNE.to_vec();
|
||||
let entry = StorageValueRef::persistent(derived_key.as_ref());
|
||||
match entry.mutate(|current: Option<Option<SessionIndex>>| -> Result<_, ()> {
|
||||
match entry.mutate(|current: Result<Option<SessionIndex>, StorageRetrievalError>| -> Result<_, ()> {
|
||||
match current {
|
||||
Some(Some(current)) if current < first_to_keep => Ok(first_to_keep),
|
||||
Ok(Some(current)) if current < first_to_keep => Ok(first_to_keep),
|
||||
// do not move the cursor, if the new one would be behind ours
|
||||
Some(Some(current)) => Ok(current),
|
||||
None => Ok(first_to_keep),
|
||||
Ok(Some(current)) => Ok(current),
|
||||
Ok(None) => Ok(first_to_keep),
|
||||
// if the storage contains undecodable data, overwrite with current anyways
|
||||
// which might leak some entries being never purged, but that is acceptable
|
||||
// in this context
|
||||
Some(None) => Ok(first_to_keep),
|
||||
Err(_) => Ok(first_to_keep),
|
||||
}
|
||||
}) {
|
||||
Ok(Ok(new_value)) => {
|
||||
Ok(new_value) => {
|
||||
// on a re-org this is not necessarily true, with the above they might be equal
|
||||
if new_value < first_to_keep {
|
||||
for session_index in new_value..first_to_keep {
|
||||
@@ -121,8 +125,8 @@ pub fn prune_older_than<T: Config>(first_to_keep: SessionIndex) {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Err(_)) => {} // failed to store the value calculated with the given closure
|
||||
Err(_) => {} // failed to calculate the value to store with the given closure
|
||||
Err(MutateStorageError::ConcurrentModification(_)) => {}
|
||||
Err(MutateStorageError::ValueFunctionFailed(_)) => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user