mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 13:31:10 +00:00
Handle local storage race conditions better (#3177)
* Make local_storage_compare_and_set take Option for old_value * Adapt srml/im-online to API changes * Bump version * Bump version again * Replace match
This commit is contained in:
committed by
Gavin Wood
parent
af914e9f40
commit
23fba990ba
@@ -143,12 +143,22 @@ impl_stubs!(
|
||||
runtime_io::local_storage_set(kind, b"test", b"asd");
|
||||
assert_eq!(runtime_io::local_storage_get(kind, b"test"), Some(b"asd".to_vec()));
|
||||
|
||||
let res = runtime_io::local_storage_compare_and_set(kind, b"test", b"asd", b"");
|
||||
let res = runtime_io::local_storage_compare_and_set(kind, b"test", Some(b"asd"), b"");
|
||||
assert_eq!(res, true);
|
||||
assert_eq!(runtime_io::local_storage_get(kind, b"test"), Some(b"".to_vec()));
|
||||
|
||||
[0].to_vec()
|
||||
},
|
||||
test_offchain_local_storage_with_none => |_| {
|
||||
let kind = substrate_primitives::offchain::StorageKind::PERSISTENT;
|
||||
assert_eq!(runtime_io::local_storage_get(kind, b"test"), None);
|
||||
|
||||
let res = runtime_io::local_storage_compare_and_set(kind, b"test", None, b"value");
|
||||
assert_eq!(res, true);
|
||||
assert_eq!(runtime_io::local_storage_get(kind, b"test"), Some(b"value".to_vec()));
|
||||
|
||||
[0].to_vec()
|
||||
},
|
||||
test_offchain_http => |_| {
|
||||
use substrate_primitives::offchain::HttpRequestStatus;
|
||||
let run = || -> Option<()> {
|
||||
|
||||
@@ -949,17 +949,24 @@ impl_function_executor!(this: FunctionExecutor<'e, E>,
|
||||
.map_err(|_| "storage kind OOB while ext_local_storage_compare_and_set: wasm")?;
|
||||
let key = this.memory.get(key, key_len as usize)
|
||||
.map_err(|_| "OOB while ext_local_storage_compare_and_set: wasm")?;
|
||||
let old_value = this.memory.get(old_value, old_value_len as usize)
|
||||
.map_err(|_| "OOB while ext_local_storage_compare_and_set: wasm")?;
|
||||
let new_value = this.memory.get(new_value, new_value_len as usize)
|
||||
.map_err(|_| "OOB while ext_local_storage_compare_and_set: wasm")?;
|
||||
|
||||
let res = this.ext.offchain()
|
||||
.map(|api| api.local_storage_compare_and_set(kind, &key, &old_value, &new_value))
|
||||
.ok_or_else(|| "Calling unavailable API ext_local_storage_compare_andset: wasm")?;
|
||||
let res = {
|
||||
if old_value == u32::max_value() {
|
||||
this.ext.offchain()
|
||||
.map(|api| api.local_storage_compare_and_set(kind, &key, None, &new_value))
|
||||
.ok_or_else(|| "Calling unavailable API ext_local_storage_compare_and_set: wasm")?
|
||||
} else {
|
||||
let v = this.memory.get(old_value, old_value_len as usize)
|
||||
.map_err(|_| "OOB while ext_local_storage_compare_and_set: wasm")?;
|
||||
this.ext.offchain()
|
||||
.map(|api| api.local_storage_compare_and_set(kind, &key, Some(v.as_slice()), &new_value))
|
||||
.ok_or_else(|| "Calling unavailable API ext_local_storage_compare_and_set: wasm")?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(if res { 0 } else { 1 })
|
||||
|
||||
},
|
||||
ext_http_request_start(
|
||||
method: *const u8,
|
||||
|
||||
@@ -355,12 +355,12 @@ where
|
||||
&mut self,
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8],
|
||||
) -> bool {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => {
|
||||
self.db.compare_and_set(STORAGE_PREFIX, key, Some(old_value), new_value)
|
||||
self.db.compare_and_set(STORAGE_PREFIX, key, old_value, new_value)
|
||||
},
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
@@ -657,14 +657,29 @@ mod tests {
|
||||
api.local_storage_set(kind, key, b"value");
|
||||
|
||||
// when
|
||||
assert_eq!(api.local_storage_compare_and_set(kind, key, b"val", b"xxx"), false);
|
||||
assert_eq!(api.local_storage_compare_and_set(kind, key, Some(b"val"), b"xxx"), false);
|
||||
assert_eq!(api.local_storage_get(kind, key), Some(b"value".to_vec()));
|
||||
|
||||
// when
|
||||
assert_eq!(api.local_storage_compare_and_set(kind, key, b"value", b"xxx"), true);
|
||||
assert_eq!(api.local_storage_compare_and_set(kind, key, Some(b"value"), b"xxx"), true);
|
||||
assert_eq!(api.local_storage_get(kind, key), Some(b"xxx".to_vec()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_compare_and_set_local_storage_with_none() {
|
||||
// given
|
||||
let kind = StorageKind::PERSISTENT;
|
||||
let mut api = offchain_api().0;
|
||||
let key = b"test";
|
||||
|
||||
// when
|
||||
let res = api.local_storage_compare_and_set(kind, key, None, b"value");
|
||||
|
||||
// then
|
||||
assert_eq!(res, true);
|
||||
assert_eq!(api.local_storage_get(kind, key), Some(b"value".to_vec()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_create_a_new_key_and_sign_and_verify_stuff() {
|
||||
let test = |kind: CryptoKind| {
|
||||
|
||||
@@ -209,14 +209,14 @@ impl offchain::Externalities for TestOffchainExt {
|
||||
&mut self,
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8]
|
||||
) -> bool {
|
||||
let mut state = self.0.write();
|
||||
match kind {
|
||||
StorageKind::LOCAL => &mut state.local_storage,
|
||||
StorageKind::PERSISTENT => &mut state.persistent_storage,
|
||||
}.compare_and_set(b"", key, Some(old_value), new_value)
|
||||
}.compare_and_set(b"", key, old_value, new_value)
|
||||
}
|
||||
|
||||
fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option<Vec<u8>> {
|
||||
|
||||
@@ -384,7 +384,7 @@ pub trait Externalities {
|
||||
&mut self,
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8],
|
||||
) -> bool;
|
||||
|
||||
@@ -514,7 +514,7 @@ impl<T: Externalities + ?Sized> Externalities for Box<T> {
|
||||
&mut self,
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8],
|
||||
) -> bool {
|
||||
(&mut **self).local_storage_compare_and_set(kind, key, old_value, new_value)
|
||||
|
||||
@@ -306,7 +306,12 @@ export_api! {
|
||||
///
|
||||
/// Note this storage is not part of the consensus, it's only accessible by
|
||||
/// offchain worker tasks running on the same machine. It IS persisted between runs.
|
||||
fn local_storage_compare_and_set(kind: StorageKind, key: &[u8], old_value: &[u8], new_value: &[u8]) -> bool;
|
||||
fn local_storage_compare_and_set(
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8]
|
||||
) -> bool;
|
||||
|
||||
/// Gets a value from the local storage.
|
||||
///
|
||||
|
||||
@@ -351,7 +351,7 @@ impl OffchainApi for () {
|
||||
fn local_storage_compare_and_set(
|
||||
kind: offchain::StorageKind,
|
||||
key: &[u8],
|
||||
old_value: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8],
|
||||
) -> bool {
|
||||
with_offchain(|ext| {
|
||||
|
||||
@@ -1055,14 +1055,27 @@ impl OffchainApi for () {
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_compare_and_set(kind: offchain::StorageKind, key: &[u8], old_value: &[u8], new_value: &[u8]) -> bool {
|
||||
fn local_storage_compare_and_set(
|
||||
kind: offchain::StorageKind,
|
||||
key: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8],
|
||||
) -> bool {
|
||||
let (ptr, len) = match old_value {
|
||||
Some(old_value) => (
|
||||
old_value.as_ptr(),
|
||||
old_value.len() as u32,
|
||||
),
|
||||
None => (0 as *const u8, u32::max_value()),
|
||||
};
|
||||
|
||||
unsafe {
|
||||
ext_local_storage_compare_and_set.get()(
|
||||
kind.into(),
|
||||
key.as_ptr(),
|
||||
key.len() as u32,
|
||||
old_value.as_ptr(),
|
||||
old_value.len() as u32,
|
||||
ptr,
|
||||
len,
|
||||
new_value.as_ptr(),
|
||||
new_value.len() as u32,
|
||||
) == 0
|
||||
|
||||
@@ -313,7 +313,7 @@ impl offchain::Externalities for NeverOffchainExt {
|
||||
&mut self,
|
||||
_kind: offchain::StorageKind,
|
||||
_key: &[u8],
|
||||
_old_value: &[u8],
|
||||
_old_value: Option<&[u8]>,
|
||||
_new_value: &[u8],
|
||||
) -> bool {
|
||||
unreachable!()
|
||||
|
||||
@@ -80,7 +80,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
|
||||
// implementation changes and behavior does not, then leave spec_version as
|
||||
// is and increment impl_version.
|
||||
spec_version: 121,
|
||||
impl_version: 121,
|
||||
impl_version: 122,
|
||||
apis: RUNTIME_API_VERSIONS,
|
||||
};
|
||||
|
||||
|
||||
@@ -233,27 +233,55 @@ decl_module! {
|
||||
.ok_or(OffchainErr::ExtrinsicCreation)?;
|
||||
sr_io::submit_transaction(&ex)
|
||||
.map_err(|_| OffchainErr::SubmitTransaction)?;
|
||||
|
||||
// once finished we set the worker status without comparing
|
||||
// if the existing value changed in the meantime. this is
|
||||
// because at this point the heartbeat was definitely submitted.
|
||||
set_worker_status::<T>(block_number, true);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_worker_status<T: Trait>(gossipping_at: T::BlockNumber, done: bool) {
|
||||
fn compare_and_set_worker_status<T: Trait>(
|
||||
gossipping_at: T::BlockNumber,
|
||||
done: bool,
|
||||
curr_worker_status: Option<Vec<u8>>,
|
||||
) -> bool {
|
||||
let enc = WorkerStatus {
|
||||
done,
|
||||
gossipping_at,
|
||||
};
|
||||
sr_io::local_storage_set(StorageKind::PERSISTENT, DB_KEY, &enc.encode());
|
||||
sr_io::local_storage_compare_and_set(
|
||||
StorageKind::PERSISTENT,
|
||||
DB_KEY,
|
||||
curr_worker_status.as_ref().map(Vec::as_slice),
|
||||
&enc.encode()
|
||||
)
|
||||
}
|
||||
|
||||
fn was_not_yet_gossipped<T: Trait>(
|
||||
fn set_worker_status<T: Trait>(
|
||||
gossipping_at: T::BlockNumber,
|
||||
done: bool,
|
||||
) {
|
||||
let enc = WorkerStatus {
|
||||
done,
|
||||
gossipping_at,
|
||||
};
|
||||
sr_io::local_storage_set(
|
||||
StorageKind::PERSISTENT, DB_KEY, &enc.encode());
|
||||
}
|
||||
|
||||
// Checks if a heartbeat gossip already occurred at this block number.
|
||||
// Returns a tuple of `(current worker status, bool)`, whereby the bool
|
||||
// is true if not yet gossipped.
|
||||
fn check_not_yet_gossipped<T: Trait>(
|
||||
now: T::BlockNumber,
|
||||
next_gossip: T::BlockNumber,
|
||||
) -> Result<bool, OffchainErr> {
|
||||
) -> Result<(Option<Vec<u8>>, bool), OffchainErr> {
|
||||
let last_gossip = sr_io::local_storage_get(StorageKind::PERSISTENT, DB_KEY);
|
||||
match last_gossip {
|
||||
Some(l) => {
|
||||
let worker_status: WorkerStatus<T::BlockNumber> = Decode::decode(&mut &l[..])
|
||||
Some(last) => {
|
||||
let worker_status: WorkerStatus<T::BlockNumber> = Decode::decode(&mut &last[..])
|
||||
.ok_or(OffchainErr::DecodeWorkerStatus)?;
|
||||
|
||||
let was_aborted = !worker_status.done && worker_status.gossipping_at < now;
|
||||
@@ -266,22 +294,29 @@ decl_module! {
|
||||
worker_status.done && worker_status.gossipping_at < next_gossip;
|
||||
|
||||
let ret = (was_aborted && !already_submitting) || not_yet_gossipped;
|
||||
Ok(ret)
|
||||
Ok((Some(last), ret))
|
||||
},
|
||||
None => Ok(true),
|
||||
None => Ok((None, true)),
|
||||
}
|
||||
}
|
||||
|
||||
let next_gossip = <GossipAt<T>>::get();
|
||||
let not_yet_gossipped = match was_not_yet_gossipped::<T>(now, next_gossip) {
|
||||
Ok(v) => v,
|
||||
let check = check_not_yet_gossipped::<T>(now, next_gossip);
|
||||
let (curr_worker_status, not_yet_gossipped) = match check {
|
||||
Ok((s, v)) => (s, v),
|
||||
Err(err) => {
|
||||
print(err);
|
||||
return;
|
||||
},
|
||||
};
|
||||
if next_gossip < now && not_yet_gossipped {
|
||||
set_worker_status::<T>(now, false);
|
||||
let value_set = compare_and_set_worker_status::<T>(now, false, curr_worker_status);
|
||||
if !value_set {
|
||||
// value could not be set in local storage, since the value was
|
||||
// different from `curr_worker_status`. this indicates that
|
||||
// another worker was running in parallel.
|
||||
return;
|
||||
}
|
||||
|
||||
match gossip_at::<T>(now) {
|
||||
Ok(_) => {},
|
||||
|
||||
Reference in New Issue
Block a user