Allow transaction for offchain indexing (#7290)

* Moving offchain change set to state machine overlay change set,
preparing use of change set internally.

* Make change set generic over key and value, and use it for offchain
indexing.

* test ui change

* remaining delta

* generating with standard method

* Remove 'drain_committed' function, and documentation.

* Default constructor for enabling offchain indexing.

* Remove offchain change specific iterators.

* remove pub accessor

* keep previous hierarchy, just expose iterator instead.

* Update primitives/state-machine/src/overlayed_changes/mod.rs

Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>

* fix line break

* missing renamings

* fix import

* fix new state-machine tests.

* Don't expose InnerValue type.

* Add test similar to set_storage.

* Remove conditional offchain storage (hard to instantiate correctly).

* fix

* offchain as children cannot fail if top doesn't

Co-authored-by: Addie Wagenknecht <addie@nortd.com>
Co-authored-by: Tomasz Drwięga <tomusdrw@users.noreply.github.com>
This commit is contained in:
cheme
2021-01-22 13:27:43 +01:00
committed by GitHub
parent 20f40fbd12
commit 878f7ccf6e
13 changed files with 382 additions and 249 deletions
+3 -3
View File
@@ -21,12 +21,12 @@
use std::sync::Arc;
use std::collections::{HashMap, HashSet};
use sp_core::ChangesTrieConfigurationRange;
use sp_core::offchain::{OffchainStorage,storage::OffchainOverlayedChanges};
use sp_core::offchain::OffchainStorage;
use sp_runtime::{generic::BlockId, Justification, Storage};
use sp_runtime::traits::{Block as BlockT, NumberFor, HashFor};
use sp_state_machine::{
ChangesTrieState, ChangesTrieStorage as StateChangesTrieStorage, ChangesTrieTransaction,
StorageCollection, ChildStorageCollection,
StorageCollection, ChildStorageCollection, OffchainChangesCollection,
};
use sp_storage::{StorageData, StorageKey, PrefixedStorageKey, ChildInfo};
use crate::{
@@ -174,7 +174,7 @@ pub trait BlockImportOperation<Block: BlockT> {
/// Write offchain storage changes to the database.
fn update_offchain_storage(
&mut self,
_offchain_update: OffchainOverlayedChanges,
_offchain_update: OffchainChangesCollection,
) -> sp_blockchain::Result<()> {
Ok(())
}
+5 -5
View File
@@ -67,7 +67,7 @@ use hash_db::Prefix;
use sp_trie::{MemoryDB, PrefixedMemoryDB, prefixed_key};
use sp_database::Transaction;
use sp_core::{Hasher, ChangesTrieConfiguration};
use sp_core::offchain::storage::{OffchainOverlayedChange, OffchainOverlayedChanges};
use sp_core::offchain::OffchainOverlayedChange;
use sp_core::storage::{well_known_keys, ChildInfo};
use sp_arithmetic::traits::Saturating;
use sp_runtime::{generic::{DigestItem, BlockId}, Justification, Storage};
@@ -76,7 +76,7 @@ use sp_runtime::traits::{
};
use sp_state_machine::{
DBValue, ChangesTrieTransaction, ChangesTrieCacheAction, UsageInfo as StateUsageInfo,
StorageCollection, ChildStorageCollection,
StorageCollection, ChildStorageCollection, OffchainChangesCollection,
backend::Backend as StateBackend, StateMachineStats,
};
use crate::utils::{DatabaseType, Meta, meta_keys, read_db, read_meta};
@@ -667,7 +667,7 @@ pub struct BlockImportOperation<Block: BlockT> {
db_updates: PrefixedMemoryDB<HashFor<Block>>,
storage_updates: StorageCollection,
child_storage_updates: ChildStorageCollection,
offchain_storage_updates: OffchainOverlayedChanges,
offchain_storage_updates: OffchainChangesCollection,
changes_trie_updates: MemoryDB<HashFor<Block>>,
changes_trie_build_cache_update: Option<ChangesTrieCacheAction<Block::Hash, NumberFor<Block>>>,
changes_trie_config_update: Option<Option<ChangesTrieConfiguration>>,
@@ -680,7 +680,7 @@ pub struct BlockImportOperation<Block: BlockT> {
impl<Block: BlockT> BlockImportOperation<Block> {
fn apply_offchain(&mut self, transaction: &mut Transaction<DbHash>) {
for ((prefix, key), value_operation) in self.offchain_storage_updates.drain() {
for ((prefix, key), value_operation) in self.offchain_storage_updates.drain(..) {
let key = crate::offchain::concatenate_prefix_and_key(&prefix, &key);
match value_operation {
OffchainOverlayedChange::SetValue(val) =>
@@ -798,7 +798,7 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block> for Bloc
fn update_offchain_storage(
&mut self,
offchain_update: OffchainOverlayedChanges,
offchain_update: OffchainChangesCollection,
) -> ClientResult<()> {
self.offchain_storage_updates = offchain_update;
Ok(())
@@ -475,13 +475,11 @@ fn offchain_index(wasm_method: WasmExecutionMethod) {
&mut ext.ext(),
).unwrap();
use sp_core::offchain::storage::OffchainOverlayedChange;
assert_eq!(
ext.overlayed_changes()
.offchain_overlay()
.get(sp_core::offchain::STORAGE_PREFIX, b"k"),
Some(OffchainOverlayedChange::SetValue(b"v".to_vec()))
);
use sp_core::offchain::OffchainOverlayedChange;
let data = ext.overlayed_changes().clone().offchain_drain_committed().find(|(k, _v)| {
k == &(sp_core::offchain::STORAGE_PREFIX.to_vec(), b"k".to_vec())
});
assert_eq!(data.map(|data| data.1), Some(OffchainOverlayedChange::SetValue(b"v".to_vec())));
}
test_wasm_execution!(offchain_local_storage_should_work);
@@ -746,6 +746,14 @@ impl TransactionPoolExt {
}
}
/// Change to be applied to the offchain worker db in regards to a key.
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum OffchainOverlayedChange {
/// Remove the data associated with the key
Remove,
/// Overwrite the value of an associated key
SetValue(Vec<u8>),
}
#[cfg(test)]
mod tests {
@@ -83,98 +83,3 @@ impl OffchainStorage for InMemOffchainStorage {
}
}
}
/// Change to be applied to the offchain worker db in regards to a key.
#[derive(Debug,Clone,Hash,Eq,PartialEq)]
pub enum OffchainOverlayedChange {
/// Remove the data associated with the key
Remove,
/// Overwrite the value of an associated key
SetValue(Vec<u8>),
}
/// In-memory storage for offchain workers recoding changes for the actual offchain storage implementation.
#[derive(Debug, Clone, Default)]
pub struct OffchainOverlayedChanges {
changes: HashMap<(Vec<u8>, Vec<u8>), OffchainOverlayedChange>,
}
impl OffchainOverlayedChanges {
/// Consume the offchain storage and iterate over all key value pairs.
pub fn into_iter(self) -> impl Iterator<Item = ((Vec<u8>, Vec<u8>), OffchainOverlayedChange)> {
self.changes.into_iter()
}
/// Iterate over all key value pairs by reference.
pub fn iter(&self) -> impl Iterator<Item = (&(Vec<u8>, Vec<u8>), &OffchainOverlayedChange)> {
self.changes.iter()
}
/// Drain all elements of changeset.
pub fn drain(&mut self) -> impl Iterator<Item = ((Vec<u8>, Vec<u8>), OffchainOverlayedChange)> + '_ {
self.changes.drain()
}
/// Remove a key and its associated value from the offchain database.
pub fn remove(&mut self, prefix: &[u8], key: &[u8]) {
self.changes.insert((prefix.to_vec(), key.to_vec()), OffchainOverlayedChange::Remove);
}
/// Set the value associated with a key under a prefix to the value provided.
pub fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
self.changes.insert(
(prefix.to_vec(), key.to_vec()),
OffchainOverlayedChange::SetValue(value.to_vec()),
);
}
/// Obtain a associated value to the given key in storage with prefix.
pub fn get(&self, prefix: &[u8], key: &[u8]) -> Option<OffchainOverlayedChange> {
let key = (prefix.to_vec(), key.to_vec());
self.changes.get(&key).cloned()
}
}
#[cfg(test)]
mod test {
use super::*;
use super::super::STORAGE_PREFIX;
#[test]
fn test_drain() {
let mut ooc = OffchainOverlayedChanges::default();
ooc.set(STORAGE_PREFIX,b"kkk", b"vvv");
let drained = ooc.drain().count();
assert_eq!(drained, 1);
let leftover = ooc.iter().count();
assert_eq!(leftover, 0);
ooc.set(STORAGE_PREFIX, b"a", b"v");
ooc.set(STORAGE_PREFIX, b"b", b"v");
ooc.set(STORAGE_PREFIX, b"c", b"v");
ooc.set(STORAGE_PREFIX, b"d", b"v");
ooc.set(STORAGE_PREFIX, b"e", b"v");
assert_eq!(ooc.iter().count(), 5);
}
#[test]
fn test_accumulated_set_remove_set() {
let mut ooc = OffchainOverlayedChanges::default();
ooc.set(STORAGE_PREFIX, b"ppp", b"qqq");
ooc.remove(STORAGE_PREFIX, b"ppp");
// keys are equiv, so it will overwrite the value and the overlay will contain
// one item
assert_eq!(ooc.iter().count(), 1);
ooc.set(STORAGE_PREFIX, b"ppp", b"rrr");
let mut iter = ooc.into_iter();
assert_eq!(
iter.next(),
Some(
((STORAGE_PREFIX.to_vec(), b"ppp".to_vec()),
OffchainOverlayedChange::SetValue(b"rrr".to_vec()))
)
);
assert_eq!(iter.next(), None);
}
}
@@ -27,7 +27,8 @@ use std::{
use crate::OpaquePeerId;
use crate::offchain::{
self,
storage::{InMemOffchainStorage, OffchainOverlayedChange, OffchainOverlayedChanges},
OffchainOverlayedChange,
storage::InMemOffchainStorage,
HttpError,
HttpRequestId as RequestId,
HttpRequestStatus as RequestStatus,
@@ -80,9 +81,12 @@ impl TestPersistentOffchainDB {
}
/// Apply a set of off-chain changes directly to the test backend
pub fn apply_offchain_changes(&mut self, changes: &mut OffchainOverlayedChanges) {
pub fn apply_offchain_changes(
&mut self,
changes: impl Iterator<Item = ((Vec<u8>, Vec<u8>), OffchainOverlayedChange)>,
) {
let mut me = self.persistent.write();
for ((_prefix, key), value_operation) in changes.drain() {
for ((_prefix, key), value_operation) in changes {
match value_operation {
OffchainOverlayedChange::SetValue(val) => me.set(Self::PREFIX, key.as_slice(), val.as_slice()),
OffchainOverlayedChange::Remove => me.remove(Self::PREFIX, key.as_slice()),
+1 -1
View File
@@ -21,7 +21,7 @@
#![warn(missing_docs)]
/// Re-export of parent module scope storage prefix.
pub use sp_core::offchain::STORAGE_PREFIX as STORAGE_PREFIX;
pub use sp_core::offchain::STORAGE_PREFIX;
sp_api::decl_runtime_apis! {
/// The offchain worker api.
+3 -22
View File
@@ -193,18 +193,10 @@ where
B: Backend<H>,
N: crate::changes_trie::BlockNumber,
{
#[cfg(feature = "std")]
fn set_offchain_storage(&mut self, key: &[u8], value: Option<&[u8]>) {
use sp_core::offchain::STORAGE_PREFIX;
match value {
Some(value) => self.overlay.offchain_set_storage(STORAGE_PREFIX, key, value),
None => self.overlay.offchain_remove_storage(STORAGE_PREFIX, key),
}
self.overlay.set_offchain_storage(key, value)
}
#[cfg(not(feature = "std"))]
fn set_offchain_storage(&mut self, _key: &[u8], _value: Option<&[u8]>) {}
fn storage(&self, key: &[u8]) -> Option<StorageValue> {
let _guard = guard();
let result = self.overlay.storage(key).map(|x| x.map(|x| x.to_vec())).unwrap_or_else(||
@@ -790,7 +782,6 @@ mod tests {
H256,
Blake2Hasher,
map,
offchain,
storage::{
Storage,
StorageChild,
@@ -813,14 +804,11 @@ mod tests {
changes.set_extrinsic_index(1);
changes.set_storage(vec![1], Some(vec![100]));
changes.set_storage(EXTRINSIC_INDEX.to_vec(), Some(3u32.encode()));
changes.set_offchain_storage(b"k1", Some(b"v1"));
changes.set_offchain_storage(b"k2", Some(b"v2"));
changes
}
fn prepare_offchain_overlay_with_changes(overlay: &mut OverlayedChanges) {
overlay.offchain_set_storage(offchain::STORAGE_PREFIX, b"k1", b"v1");
overlay.offchain_set_storage(offchain::STORAGE_PREFIX, b"k2", b"v2");
}
fn changes_trie_config() -> ChangesTrieConfiguration {
ChangesTrieConfiguration {
digest_interval: 0,
@@ -849,7 +837,6 @@ mod tests {
#[test]
fn storage_changes_root_is_some_when_extrinsic_changes_are_non_empty() {
let mut overlay = prepare_overlay_with_changes();
prepare_offchain_overlay_with_changes(&mut overlay);
let mut cache = StorageTransactionCache::default();
let storage = TestChangesTrieStorage::with_blocks(vec![(99, Default::default())]);
let state = Some(ChangesTrieState::new(changes_trie_config(), Zero::zero(), &storage));
@@ -864,7 +851,6 @@ mod tests {
#[test]
fn storage_changes_root_is_some_when_extrinsic_changes_are_empty() {
let mut overlay = prepare_overlay_with_changes();
prepare_offchain_overlay_with_changes(&mut overlay);
let mut cache = StorageTransactionCache::default();
overlay.set_collect_extrinsics(false);
overlay.set_storage(vec![1], None);
@@ -884,7 +870,6 @@ mod tests {
let mut overlay = OverlayedChanges::default();
overlay.set_storage(vec![20], None);
overlay.set_storage(vec![30], Some(vec![31]));
prepare_offchain_overlay_with_changes(&mut overlay);
let backend = Storage {
top: map![
vec![10] => vec![10],
@@ -939,8 +924,6 @@ mod tests {
],
}.into();
prepare_offchain_overlay_with_changes(&mut overlay);
let ext = TestExt::new(&mut overlay, &mut cache, &backend, None, None);
// next_backend < next_overlay
@@ -971,7 +954,6 @@ mod tests {
let mut overlay = OverlayedChanges::default();
overlay.set_child_storage(child_info, vec![20], None);
overlay.set_child_storage(child_info, vec![30], Some(vec![31]));
prepare_offchain_overlay_with_changes(&mut overlay);
let backend = Storage {
top: map![],
children_default: map![
@@ -1013,7 +995,6 @@ mod tests {
let child_info = &child_info;
let mut cache = StorageTransactionCache::default();
let mut overlay = OverlayedChanges::default();
prepare_offchain_overlay_with_changes(&mut overlay);
let backend = Storage {
top: map![],
children_default: map![
@@ -119,6 +119,8 @@ pub use crate::overlayed_changes::{
OverlayedChanges, StorageKey, StorageValue,
StorageCollection, ChildStorageCollection,
StorageChanges, StorageTransactionCache,
OffchainChangesCollection,
OffchainOverlayedChanges,
};
pub use crate::backend::Backend;
pub use crate::trie_backend_essence::{TrieBackendStorage, Storage};
@@ -25,6 +25,7 @@ use std::collections::HashSet as Set;
use sp_std::collections::btree_set::BTreeSet as Set;
use sp_std::collections::{btree_map::BTreeMap, btree_set::BTreeSet};
use sp_std::hash::Hash;
use smallvec::SmallVec;
use crate::warn;
@@ -32,8 +33,8 @@ const PROOF_OVERLAY_NON_EMPTY: &str = "\
An OverlayValue is always created with at least one transaction and dropped as soon
as the last transaction is removed; qed";
type DirtyKeysSets = SmallVec<[Set<StorageKey>; 5]>;
type Transactions = SmallVec<[InnerValue; 5]>;
type DirtyKeysSets<K> = SmallVec<[Set<K>; 5]>;
type Transactions<V> = SmallVec<[InnerValue<V>; 5]>;
/// Error returned when trying to commit or rollback while no transaction is open or
/// when the runtime is trying to close a transaction started by the client.
@@ -62,32 +63,46 @@ pub enum ExecutionMode {
#[derive(Debug, Default, Clone)]
#[cfg_attr(test, derive(PartialEq))]
struct InnerValue {
struct InnerValue<V> {
/// Current value. None if value has been deleted.
value: Option<StorageValue>,
value: V,
/// The set of extrinsic indices where the values has been changed.
/// Is filled only if runtime has announced changes trie support.
extrinsics: Extrinsics,
}
/// An overlay that contains all versions of a value for a specific key.
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
#[cfg_attr(test, derive(PartialEq))]
pub struct OverlayedValue {
pub struct OverlayedEntry<V> {
/// The individual versions of that value.
/// One entry per transactions during that the value was actually written.
transactions: Transactions,
transactions: Transactions<V>,
}
impl<V> Default for OverlayedEntry<V> {
fn default() -> Self {
Self {
transactions: SmallVec::new(),
}
}
}
/// History of value, with removal support.
pub type OverlayedValue = OverlayedEntry<Option<StorageValue>>;
/// Change set for basic key value with extrinsics index recording and removal support.
pub type OverlayedChangeSet = OverlayedMap<StorageKey, Option<StorageValue>>;
/// Holds a set of changes with the ability modify them using nested transactions.
#[derive(Debug, Default, Clone)]
pub struct OverlayedChangeSet {
#[derive(Debug, Clone)]
pub struct OverlayedMap<K: Ord + Hash, V> {
/// Stores the changes that this overlay constitutes.
changes: BTreeMap<StorageKey, OverlayedValue>,
changes: BTreeMap<K, OverlayedEntry<V>>,
/// Stores which keys are dirty per transaction. Needed in order to determine which
/// values to merge into the parent transaction on commit. The length of this vector
/// therefore determines how many nested transactions are currently open (depth).
dirty_keys: DirtyKeysSets,
dirty_keys: DirtyKeysSets<K>,
/// The number of how many transactions beginning from the first transactions are started
/// by the client. Those transactions are protected against close (commit, rollback)
/// when in runtime mode.
@@ -96,16 +111,32 @@ pub struct OverlayedChangeSet {
execution_mode: ExecutionMode,
}
impl<K: Ord + Hash, V> Default for OverlayedMap<K, V> {
fn default() -> Self {
Self {
changes: BTreeMap::new(),
dirty_keys: SmallVec::new(),
num_client_transactions: Default::default(),
execution_mode: Default::default(),
}
}
}
impl Default for ExecutionMode {
fn default() -> Self {
Self::Client
}
}
impl OverlayedValue {
impl<V> OverlayedEntry<V> {
/// The value as seen by the current transaction.
pub fn value(&self) -> Option<&StorageValue> {
self.transactions.last().expect(PROOF_OVERLAY_NON_EMPTY).value.as_ref()
pub fn value_ref(&self) -> &V {
&self.transactions.last().expect(PROOF_OVERLAY_NON_EMPTY).value
}
/// The value as seen by the current transaction.
pub fn into_value(mut self) -> V {
self.transactions.pop().expect(PROOF_OVERLAY_NON_EMPTY).value
}
/// Unique list of extrinsic indices which modified the value.
@@ -116,12 +147,12 @@ impl OverlayedValue {
}
/// Mutable reference to the most recent version.
fn value_mut(&mut self) -> &mut Option<StorageValue> {
fn value_mut(&mut self) -> &mut V {
&mut self.transactions.last_mut().expect(PROOF_OVERLAY_NON_EMPTY).value
}
/// Remove the last version and return it.
fn pop_transaction(&mut self) -> InnerValue {
fn pop_transaction(&mut self) -> InnerValue<V> {
self.transactions.pop().expect(PROOF_OVERLAY_NON_EMPTY)
}
@@ -136,14 +167,14 @@ impl OverlayedValue {
/// rolled back when required.
fn set(
&mut self,
value: Option<StorageValue>,
value: V,
first_write_in_tx: bool,
at_extrinsic: Option<u32>,
) {
if first_write_in_tx || self.transactions.is_empty() {
self.transactions.push(InnerValue {
value,
.. Default::default()
extrinsics: Default::default(),
});
} else {
*self.value_mut() = value;
@@ -155,15 +186,22 @@ impl OverlayedValue {
}
}
impl OverlayedEntry<Option<StorageValue>> {
/// The value as seen by the current transaction.
pub fn value(&self) -> Option<&StorageValue> {
self.value_ref().as_ref()
}
}
/// Inserts a key into the dirty set.
///
/// Returns true iff we are currently have at least one open transaction and if this
/// is the first write to the given key that transaction.
fn insert_dirty(set: &mut DirtyKeysSets, key: StorageKey) -> bool {
fn insert_dirty<K: Ord + Hash>(set: &mut DirtyKeysSets<K>, key: K) -> bool {
set.last_mut().map(|dk| dk.insert(key)).unwrap_or_default()
}
impl OverlayedChangeSet {
impl<K: Ord + Hash + Clone, V> OverlayedMap<K, V> {
/// Create a new changeset at the same transaction state but without any contents.
///
/// This changeset might be created when there are already open transactions.
@@ -171,10 +209,10 @@ impl OverlayedChangeSet {
pub fn spawn_child(&self) -> Self {
use sp_std::iter::repeat;
Self {
changes: Default::default(),
dirty_keys: repeat(Set::new()).take(self.transaction_depth()).collect(),
num_client_transactions: self.num_client_transactions,
execution_mode: self.execution_mode,
.. Default::default()
}
}
@@ -184,7 +222,11 @@ impl OverlayedChangeSet {
}
/// Get an optional reference to the value stored for the specified key.
pub fn get(&self, key: &[u8]) -> Option<&OverlayedValue> {
pub fn get<Q>(&self, key: &Q) -> Option<&OverlayedEntry<V>>
where
K: sp_std::borrow::Borrow<Q>,
Q: Ord + ?Sized,
{
self.changes.get(key)
}
@@ -193,72 +235,30 @@ impl OverlayedChangeSet {
/// Can be rolled back or committed when called inside a transaction.
pub fn set(
&mut self,
key: StorageKey,
value: Option<StorageValue>,
key: K,
value: V,
at_extrinsic: Option<u32>,
) {
let overlayed = self.changes.entry(key.clone()).or_default();
overlayed.set(value, insert_dirty(&mut self.dirty_keys, key), at_extrinsic);
}
/// Get a mutable reference for a value.
///
/// Can be rolled back or committed when called inside a transaction.
#[must_use = "A change was registered, so this value MUST be modified."]
pub fn modify(
&mut self,
key: StorageKey,
init: impl Fn() -> StorageValue,
at_extrinsic: Option<u32>,
) -> &mut Option<StorageValue> {
let overlayed = self.changes.entry(key.clone()).or_default();
let first_write_in_tx = insert_dirty(&mut self.dirty_keys, key);
let clone_into_new_tx = if let Some(tx) = overlayed.transactions.last() {
if first_write_in_tx {
Some(tx.value.clone())
} else {
None
}
} else {
Some(Some(init()))
};
if let Some(cloned) = clone_into_new_tx {
overlayed.set(cloned, first_write_in_tx, at_extrinsic);
}
overlayed.value_mut()
}
/// Set all values to deleted which are matched by the predicate.
///
/// Can be rolled back or committed when called inside a transaction.
pub fn clear_where(
&mut self,
predicate: impl Fn(&[u8], &OverlayedValue) -> bool,
at_extrinsic: Option<u32>,
) {
for (key, val) in self.changes.iter_mut().filter(|(k, v)| predicate(k, v)) {
val.set(None, insert_dirty(&mut self.dirty_keys, key.clone()), at_extrinsic);
}
}
/// Get a list of all changes as seen by current transaction.
pub fn changes(&self) -> impl Iterator<Item=(&StorageKey, &OverlayedValue)> {
pub fn changes(&self) -> impl Iterator<Item=(&K, &OverlayedEntry<V>)> {
self.changes.iter()
}
/// Get the change that is next to the supplied key.
pub fn next_change(&self, key: &[u8]) -> Option<(&[u8], &OverlayedValue)> {
use sp_std::ops::Bound;
let range = (Bound::Excluded(key), Bound::Unbounded);
self.changes.range::<[u8], _>(range).next().map(|(k, v)| (&k[..], v))
/// Get a list of all changes as seen by current transaction, consumes
/// the overlay.
pub fn into_changes(self) -> impl Iterator<Item=(K, OverlayedEntry<V>)> {
self.changes.into_iter()
}
/// Consume this changeset and return all committed changes.
///
/// Panics:
/// Panics if there are open transactions: `transaction_depth() > 0`
pub fn drain_commited(self) -> impl Iterator<Item=(StorageKey, Option<StorageValue>)> {
pub fn drain_commited(self) -> impl Iterator<Item=(K, V)> {
assert!(self.transaction_depth() == 0, "Drain is not allowed with open transactions.");
self.changes.into_iter().map(|(k, mut v)| (k, v.pop_transaction().value))
}
@@ -384,6 +384,56 @@ impl OverlayedChangeSet {
}
}
impl OverlayedChangeSet {
/// Get a mutable reference for a value.
///
/// Can be rolled back or committed when called inside a transaction.
#[must_use = "A change was registered, so this value MUST be modified."]
pub fn modify(
&mut self,
key: StorageKey,
init: impl Fn() -> StorageValue,
at_extrinsic: Option<u32>,
) -> &mut Option<StorageValue> {
let overlayed = self.changes.entry(key.clone()).or_default();
let first_write_in_tx = insert_dirty(&mut self.dirty_keys, key);
let clone_into_new_tx = if let Some(tx) = overlayed.transactions.last() {
if first_write_in_tx {
Some(tx.value.clone())
} else {
None
}
} else {
Some(Some(init()))
};
if let Some(cloned) = clone_into_new_tx {
overlayed.set(cloned, first_write_in_tx, at_extrinsic);
}
overlayed.value_mut()
}
/// Set all values to deleted which are matched by the predicate.
///
/// Can be rolled back or committed when called inside a transaction.
pub fn clear_where(
&mut self,
predicate: impl Fn(&[u8], &OverlayedValue) -> bool,
at_extrinsic: Option<u32>,
) {
for (key, val) in self.changes.iter_mut().filter(|(k, v)| predicate(k, v)) {
val.set(None, insert_dirty(&mut self.dirty_keys, key.clone()), at_extrinsic);
}
}
/// Get the change that is next to the supplied key.
pub fn next_change(&self, key: &[u8]) -> Option<(&[u8], &OverlayedValue)> {
use sp_std::ops::Bound;
let range = (Bound::Excluded(key), Bound::Unbounded);
self.changes.range::<[u8], _>(range).next().map(|(k, v)| (&k[..], v))
}
}
#[cfg(test)]
mod test {
use super::*;
@@ -18,7 +18,9 @@
//! The overlayed changes to state.
mod changeset;
mod offchain;
pub use offchain::OffchainOverlayedChanges;
use crate::{
backend::Backend,
stats::StateMachineStats,
@@ -42,8 +44,7 @@ use sp_std::collections::btree_map::{BTreeMap as Map, Entry as MapEntry};
use sp_std::collections::btree_set::BTreeSet;
use codec::{Decode, Encode};
use sp_core::storage::{well_known_keys::EXTRINSIC_INDEX, ChildInfo};
#[cfg(feature = "std")]
use sp_core::offchain::storage::{OffchainOverlayedChanges, OffchainOverlayedChange};
use sp_core::offchain::OffchainOverlayedChange;
use hash_db::Hasher;
use crate::DefaultError;
use sp_externalities::{Extensions, Extension};
@@ -65,6 +66,9 @@ pub type StorageCollection = Vec<(StorageKey, Option<StorageValue>)>;
/// In memory arrays of storage values for multiple child tries.
pub type ChildStorageCollection = Vec<(StorageKey, StorageCollection)>;
/// In memory array of storage values.
pub type OffchainChangesCollection = Vec<((Vec<u8>, Vec<u8>), OffchainOverlayedChange)>;
/// Keep trace of extrinsics index for a modified value.
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct Extrinsics(Vec<u32>);
@@ -97,13 +101,12 @@ pub struct OverlayedChanges {
top: OverlayedChangeSet,
/// Child storage changes. The map key is the child storage key without the common prefix.
children: Map<StorageKey, (OverlayedChangeSet, ChildInfo)>,
/// Offchain related changes.
offchain: OffchainOverlayedChanges,
/// True if extrinsics stats must be collected.
collect_extrinsics: bool,
/// Collect statistic on this execution.
stats: StateMachineStats,
/// Offchain related changes.
#[cfg(feature = "std")]
offchain: OffchainOverlayedChanges,
}
/// A storage changes structure that can be generated by the data collected in [`OverlayedChanges`].
@@ -118,8 +121,7 @@ pub struct StorageChanges<Transaction, H: Hasher, N: BlockNumber> {
/// All changes to the child storages.
pub child_storage_changes: ChildStorageCollection,
/// Offchain state changes to write to the offchain database.
#[cfg(feature = "std")]
pub offchain_storage_changes: OffchainOverlayedChanges,
pub offchain_storage_changes: OffchainChangesCollection,
/// A transaction for the backend that contains all changes from
/// [`main_storage_changes`](StorageChanges::main_storage_changes) and from
/// [`child_storage_changes`](StorageChanges::child_storage_changes).
@@ -143,7 +145,7 @@ impl<Transaction, H: Hasher, N: BlockNumber> StorageChanges<Transaction, H, N> {
pub fn into_inner(self) -> (
StorageCollection,
ChildStorageCollection,
OffchainOverlayedChanges,
OffchainChangesCollection,
Transaction,
H::Out,
Option<ChangesTrieTransaction<H, N>>,
@@ -205,7 +207,6 @@ impl<Transaction: Default, H: Hasher, N: BlockNumber> Default for StorageChanges
Self {
main_storage_changes: Default::default(),
child_storage_changes: Default::default(),
#[cfg(feature = "std")]
offchain_storage_changes: Default::default(),
transaction: Default::default(),
transaction_storage_root: Default::default(),
@@ -375,6 +376,7 @@ impl OverlayedChanges {
for (_, (changeset, _)) in self.children.iter_mut() {
changeset.start_transaction();
}
self.offchain.overlay_mut().start_transaction();
}
/// Rollback the last transaction started by `start_transaction`.
@@ -388,6 +390,8 @@ impl OverlayedChanges {
.expect("Top and children changesets are started in lockstep; qed");
!changeset.is_empty()
});
self.offchain.overlay_mut().rollback_transaction()
.expect("Top and offchain changesets are started in lockstep; qed");
Ok(())
}
@@ -401,6 +405,8 @@ impl OverlayedChanges {
changeset.commit_transaction()
.expect("Top and children changesets are started in lockstep; qed");
}
self.offchain.overlay_mut().commit_transaction()
.expect("Top and offchain changesets are started in lockstep; qed");
Ok(())
}
@@ -414,6 +420,8 @@ impl OverlayedChanges {
changeset.enter_runtime()
.expect("Top and children changesets are entering runtime in lockstep; qed")
}
self.offchain.overlay_mut().enter_runtime()
.expect("Top and offchain changesets are started in lockstep; qed");
Ok(())
}
@@ -427,6 +435,8 @@ impl OverlayedChanges {
changeset.exit_runtime()
.expect("Top and children changesets are entering runtime in lockstep; qed");
}
self.offchain.overlay_mut().exit_runtime()
.expect("Top and offchain changesets are started in lockstep; qed");
Ok(())
}
@@ -452,6 +462,16 @@ impl OverlayedChanges {
)
}
/// Consume all changes (top + children) and return them.
///
/// After calling this function no more changes are contained in this changeset.
///
/// Panics:
/// Panics if `transaction_depth() > 0`
pub fn offchain_drain_committed(&mut self) -> impl Iterator<Item=((StorageKey, StorageKey), OffchainOverlayedChange)> {
self.offchain.drain()
}
/// Get an iterator over all child changes as seen by the current transaction.
pub fn children(&self)
-> impl Iterator<Item=(impl Iterator<Item=(&StorageKey, &OverlayedValue)>, &ChildInfo)> {
@@ -521,12 +541,12 @@ impl OverlayedChanges {
.expect("Changes trie transaction was generated by `changes_trie_root`; qed");
let (main_storage_changes, child_storage_changes) = self.drain_committed();
let offchain_storage_changes = self.offchain_drain_committed().collect();
Ok(StorageChanges {
main_storage_changes: main_storage_changes.collect(),
child_storage_changes: child_storage_changes.map(|(sk, it)| (sk, it.0.collect())).collect(),
#[cfg(feature = "std")]
offchain_storage_changes: std::mem::take(&mut self.offchain),
offchain_storage_changes,
transaction,
transaction_storage_root,
#[cfg(feature = "std")]
@@ -633,38 +653,18 @@ impl OverlayedChanges {
)
}
/// Set a value in the offchain storage.
#[cfg(feature = "std")]
pub fn offchain_set_storage(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
self.offchain.set(prefix, key, value);
}
/// Clear a value in the offchain storage.
#[cfg(feature = "std")]
pub fn offchain_remove_storage(&mut self, prefix: &[u8], key: &[u8]) {
self.offchain.remove(prefix, key);
}
/// Get a value in the offchain storage.
#[cfg(feature = "std")]
pub fn offchain_get_storage(
&mut self,
prefix: &[u8],
key: &[u8],
) -> Option<OffchainOverlayedChange> {
self.offchain.get(prefix, key)
}
/// Returns a reference to the offchain overlay.
#[cfg(feature = "std")]
pub fn offchain_overlay(&self) -> &OffchainOverlayedChanges {
/// Read only access ot offchain overlay.
pub fn offchain(&self) -> &OffchainOverlayedChanges {
&self.offchain
}
/// Returns a mutable reference to the offchain overlay.
#[cfg(feature = "std")]
pub fn offchain_overlay_mut(&mut self) -> &mut OffchainOverlayedChanges {
&mut self.offchain
/// Write a key value pair to the offchain storage overlay.
pub fn set_offchain_storage(&mut self, key: &[u8], value: Option<&[u8]>) {
use sp_core::offchain::STORAGE_PREFIX;
match value {
Some(value) => self.offchain.set(STORAGE_PREFIX, key, value),
None => self.offchain.remove(STORAGE_PREFIX, key),
}
}
}
@@ -804,6 +804,61 @@ mod tests {
assert!(overlayed.storage(&key).unwrap().is_none());
}
#[test]
fn offchain_overlayed_storage_transactions_works() {
use sp_core::offchain::STORAGE_PREFIX;
fn check_offchain_content(
state: &OverlayedChanges,
nb_commit: usize,
expected: Vec<(Vec<u8>, Option<Vec<u8>>)>,
) {
let mut state = state.clone();
for _ in 0..nb_commit {
state.commit_transaction().unwrap();
}
let offchain_data: Vec<_> = state.offchain_drain_committed().collect();
let expected: Vec<_> = expected.into_iter().map(|(key, value)| {
let change = match value {
Some(value) => OffchainOverlayedChange::SetValue(value),
None => OffchainOverlayedChange::Remove,
};
((STORAGE_PREFIX.to_vec(), key), change)
}).collect();
assert_eq!(offchain_data, expected);
}
let mut overlayed = OverlayedChanges::default();
let key = vec![42, 69, 169, 142];
check_offchain_content(&overlayed, 0, vec![]);
overlayed.start_transaction();
overlayed.set_offchain_storage(key.as_slice(), Some(&[1, 2, 3][..]));
check_offchain_content(&overlayed, 1, vec![(key.clone(), Some(vec![1, 2, 3]))]);
overlayed.commit_transaction().unwrap();
check_offchain_content(&overlayed, 0, vec![(key.clone(), Some(vec![1, 2, 3]))]);
overlayed.start_transaction();
overlayed.set_offchain_storage(key.as_slice(), Some(&[][..]));
check_offchain_content(&overlayed, 1, vec![(key.clone(), Some(vec![]))]);
overlayed.set_offchain_storage(key.as_slice(), None);
check_offchain_content(&overlayed, 1, vec![(key.clone(), None)]);
overlayed.rollback_transaction().unwrap();
check_offchain_content(&overlayed, 0, vec![(key.clone(), Some(vec![1, 2, 3]))]);
overlayed.set_offchain_storage(key.as_slice(), None);
check_offchain_content(&overlayed, 0, vec![(key.clone(), None)]);
}
#[test]
fn overlayed_storage_root_works() {
let initial: BTreeMap<_, _> = vec![
@@ -0,0 +1,130 @@
// This file is part of Substrate.
// Copyright (C) 2019-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Overlayed changes for offchain indexing.
use sp_core::offchain::OffchainOverlayedChange;
use sp_std::prelude::Vec;
use super::changeset::OverlayedMap;
/// In-memory storage for offchain workers recoding changes for the actual offchain storage implementation.
#[derive(Debug, Clone, Default)]
pub struct OffchainOverlayedChanges(OverlayedMap<(Vec<u8>, Vec<u8>), OffchainOverlayedChange>);
/// Item for iterating over offchain changes.
///
/// First element i a tuple of `(prefix, key)`, second element ist the actual change
/// (remove or set value).
type OffchainOverlayedChangesItem<'i> = (&'i (Vec<u8>, Vec<u8>), &'i OffchainOverlayedChange);
/// Iterator over offchain changes, owned memory version.
type OffchainOverlayedChangesItemOwned = ((Vec<u8>, Vec<u8>), OffchainOverlayedChange);
impl OffchainOverlayedChanges {
/// Consume the offchain storage and iterate over all key value pairs.
pub fn into_iter(self) -> impl Iterator<Item = OffchainOverlayedChangesItemOwned> {
self.0.into_changes().map(|kv| (kv.0, kv.1.into_value()))
}
/// Iterate over all key value pairs by reference.
pub fn iter<'a>(&'a self) -> impl Iterator<Item = OffchainOverlayedChangesItem<'a>> {
self.0.changes().map(|kv| (kv.0, kv.1.value_ref()))
}
/// Drain all elements of changeset.
pub fn drain(&mut self) -> impl Iterator<Item = OffchainOverlayedChangesItemOwned> {
sp_std::mem::take(self).into_iter()
}
/// Remove a key and its associated value from the offchain database.
pub fn remove(&mut self, prefix: &[u8], key: &[u8]) {
let _ = self.0.set(
(prefix.to_vec(), key.to_vec()),
OffchainOverlayedChange::Remove,
None,
);
}
/// Set the value associated with a key under a prefix to the value provided.
pub fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
let _ = self.0.set(
(prefix.to_vec(), key.to_vec()),
OffchainOverlayedChange::SetValue(value.to_vec()),
None,
);
}
/// Obtain a associated value to the given key in storage with prefix.
pub fn get(&self, prefix: &[u8], key: &[u8]) -> Option<OffchainOverlayedChange> {
let key = (prefix.to_vec(), key.to_vec());
self.0.get(&key).map(|entry| entry.value_ref()).cloned()
}
/// Reference to inner change set.
pub fn overlay(&self) -> &OverlayedMap<(Vec<u8>, Vec<u8>), OffchainOverlayedChange> {
&self.0
}
/// Mutable reference to inner change set.
pub fn overlay_mut(&mut self) -> &mut OverlayedMap<(Vec<u8>, Vec<u8>), OffchainOverlayedChange> {
&mut self.0
}
}
#[cfg(test)]
mod test {
use super::*;
use sp_core::offchain::STORAGE_PREFIX;
#[test]
fn test_drain() {
let mut ooc = OffchainOverlayedChanges::default();
ooc.set(STORAGE_PREFIX, b"kkk", b"vvv");
let drained = ooc.drain().count();
assert_eq!(drained, 1);
let leftover = ooc.iter().count();
assert_eq!(leftover, 0);
ooc.set(STORAGE_PREFIX, b"a", b"v");
ooc.set(STORAGE_PREFIX, b"b", b"v");
ooc.set(STORAGE_PREFIX, b"c", b"v");
ooc.set(STORAGE_PREFIX, b"d", b"v");
ooc.set(STORAGE_PREFIX, b"e", b"v");
assert_eq!(ooc.iter().count(), 5);
}
#[test]
fn test_accumulated_set_remove_set() {
let mut ooc = OffchainOverlayedChanges::default();
ooc.set(STORAGE_PREFIX, b"ppp", b"qqq");
ooc.remove(STORAGE_PREFIX, b"ppp");
// keys are equiv, so it will overwrite the value and the overlay will contain
// one item
assert_eq!(ooc.iter().count(), 1);
ooc.set(STORAGE_PREFIX, b"ppp", b"rrr");
let mut iter = ooc.into_iter();
assert_eq!(
iter.next(),
Some(
((STORAGE_PREFIX.to_vec(), b"ppp".to_vec()),
OffchainOverlayedChange::SetValue(b"rrr".to_vec()))
)
);
assert_eq!(iter.next(), None);
}
}
@@ -127,7 +127,7 @@ impl<H: Hasher, N: ChangesTrieBlockNumber> TestExternalities<H, N>
/// Move offchain changes from overlay to the persistent store.
pub fn persist_offchain_overlay(&mut self) {
self.offchain_db.apply_offchain_changes(self.overlay.offchain_overlay_mut());
self.offchain_db.apply_offchain_changes(self.overlay.offchain_drain_committed());
}
/// A shared reference type around the offchain worker storage.