diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 1315dc32b6..2a2ff12ec2 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -4244,6 +4244,8 @@ dependencies = [ "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "sr-io 2.0.0", "sr-version 2.0.0", + "substrate-client 2.0.0", + "substrate-offchain 2.0.0", "substrate-panic-handler 2.0.0", "substrate-primitives 2.0.0", "substrate-serializer 2.0.0", @@ -4386,7 +4388,7 @@ dependencies = [ "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 2.0.0", "substrate-client 2.0.0", - "substrate-consensus-common 2.0.0", + "substrate-client-db 2.0.0", "substrate-offchain-primitives 2.0.0", "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", diff --git a/substrate/core/client/db/src/lib.rs b/substrate/core/client/db/src/lib.rs index b1c0596a0e..840a62dcd2 100644 --- a/substrate/core/client/db/src/lib.rs +++ b/substrate/core/client/db/src/lib.rs @@ -24,7 +24,10 @@ //! //! Finality implies canonicality but not vice-versa. +#![warn(missing_docs)] + pub mod light; +pub mod offchain; mod cache; mod storage_cache; @@ -77,6 +80,10 @@ const DEFAULT_CHILD_RATIO: (usize, usize) = (1, 10); /// DB-backed patricia trie state, transaction type is an overlay of changes to commit. pub type DbState = state_machine::TrieBackend>, Blake2Hasher>; +/// A reference tracking state. +/// +/// It makes sure that the hash we are using stays pinned in storage +/// until this structure is dropped. pub struct RefTrackingState { state: DbState, storage: Arc>, @@ -202,7 +209,7 @@ pub fn new_client( Ok(client::Client::new(backend, executor, genesis_storage, execution_strategies)?) } -mod columns { +pub(crate) mod columns { pub const META: Option = crate::utils::COLUMN_META; pub const STATE: Option = Some(1); pub const STATE_META: Option = Some(2); @@ -213,6 +220,8 @@ mod columns { pub const JUSTIFICATION: Option = Some(6); pub const CHANGES_TRIE: Option = Some(7); pub const AUX: Option = Some(8); + /// Offchain workers local storage + pub const OFFCHAIN: Option = Some(9); } struct PendingBlock { @@ -531,6 +540,7 @@ impl state_machine::Storage for DbGenesisStorage { } } +/// A database wrapper for changes tries. pub struct DbChangesTrieStorage { db: Arc, meta: Arc, Block::Hash>>>, @@ -675,6 +685,7 @@ where /// Otherwise, trie nodes are kept only from some recent blocks. pub struct Backend { storage: Arc>, + offchain_storage: offchain::LocalStorage, changes_tries_storage: DbChangesTrieStorage, /// None<*> means that the value hasn't been cached yet. Some(*) means that the value (either None or /// Some(*)) has been cached and is valid. @@ -689,31 +700,29 @@ impl> Backend { /// Create a new instance of database backend. /// /// The pruning window is how old a block must be before the state is pruned. - pub fn new(config: DatabaseSettings, canonicalization_delay: u64) -> Result { + pub fn new(config: DatabaseSettings, canonicalization_delay: u64) -> client::error::Result { Self::new_inner(config, canonicalization_delay) } - #[cfg(feature = "kvdb-rocksdb")] fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result { + #[cfg(feature = "kvdb-rocksdb")] let db = crate::utils::open_database(&config, columns::META, "full")?; - Backend::from_kvdb(db as Arc<_>, canonicalization_delay, &config) - } - - #[cfg(not(feature = "kvdb-rocksdb"))] - fn new_inner(config: DatabaseSettings, canonicalization_delay: u64) -> Result { - log::warn!("Running without the RocksDB feature. The database will NOT be saved."); - let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS)); - Backend::from_kvdb(db as Arc<_>, canonicalization_delay, &config) + #[cfg(not(feature = "kvdb-rocksdb"))] + let db = { + log::warn!("Running without the RocksDB feature. The database will NOT be saved."); + Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS)) + }; + Self::from_kvdb(db as Arc<_>, canonicalization_delay, &config) } + /// Create new memory-backed client backend for tests. #[cfg(any(test, feature = "test-helpers"))] pub fn new_test(keep_blocks: u32, canonicalization_delay: u64) -> Self { - use utils::NUM_COLUMNS; - - let db = Arc::new(::kvdb_memorydb::create(NUM_COLUMNS)); + let db = Arc::new(kvdb_memorydb::create(crate::utils::NUM_COLUMNS)); Self::new_test_db(keep_blocks, canonicalization_delay, db as Arc<_>) } + /// Creates a client backend with test settings. #[cfg(any(test, feature = "test-helpers"))] pub fn new_test_db(keep_blocks: u32, canonicalization_delay: u64, db: Arc) -> Self { @@ -724,7 +733,7 @@ impl> Backend { path: Default::default(), pruning: PruningMode::keep_blocks(keep_blocks), }; - Backend::from_kvdb( + Self::from_kvdb( db, canonicalization_delay, &db_setting, @@ -745,6 +754,7 @@ impl> Backend { db: db.clone(), state_db, }; + let offchain_storage = offchain::LocalStorage::new(db.clone()); let changes_tries_storage = DbChangesTrieStorage { db, meta, @@ -754,6 +764,7 @@ impl> Backend { Ok(Backend { storage: Arc::new(storage_db), + offchain_storage, changes_tries_storage, changes_trie_config: Mutex::new(None), blockchain, @@ -1232,6 +1243,7 @@ impl client::backend::Backend for Backend whe type Blockchain = BlockchainDb; type State = CachingState, Block>; type ChangesTrieStorage = DbChangesTrieStorage; + type OffchainStorage = offchain::LocalStorage; fn begin_operation(&self) -> Result { let old_state = self.state_at(BlockId::Hash(Default::default()))?; @@ -1305,6 +1317,10 @@ impl client::backend::Backend for Backend whe Some(&self.changes_tries_storage) } + fn offchain_storage(&self) -> Option { + Some(self.offchain_storage.clone()) + } + fn revert(&self, n: NumberFor) -> Result, client::error::Error> { let mut best = self.blockchain.info().best_number; let finalized = self.blockchain.info().finalized_number; diff --git a/substrate/core/client/db/src/light.rs b/substrate/core/client/db/src/light.rs index 0abce00528..5bbd3b64cf 100644 --- a/substrate/core/client/db/src/light.rs +++ b/substrate/core/client/db/src/light.rs @@ -84,6 +84,7 @@ impl LightStorage Self::from_kvdb(db as Arc<_>) } + /// Create new memory-backed `LightStorage` for tests. #[cfg(any(test, feature = "test-helpers"))] pub fn new_test() -> Self { use utils::NUM_COLUMNS; diff --git a/substrate/core/client/db/src/offchain.rs b/substrate/core/client/db/src/offchain.rs new file mode 100644 index 0000000000..1ec6e357e6 --- /dev/null +++ b/substrate/core/client/db/src/offchain.rs @@ -0,0 +1,137 @@ +// Copyright 2017-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! RocksDB-based offchain workers local storage. + +use std::{ + collections::HashMap, + sync::Arc, +}; + +use crate::columns; +use kvdb::KeyValueDB; +use parking_lot::Mutex; + +/// Offchain local storage +#[derive(Clone)] +pub struct LocalStorage { + db: Arc, + locks: Arc, Arc>>>>, +} + +impl std::fmt::Debug for LocalStorage { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + fmt.debug_struct("LocalStorage") + .finish() + } +} + +impl LocalStorage { + /// Create new offchain storage for tests (backed by memorydb) + #[cfg(any(test, feature = "test-helpers"))] + pub fn new_test() -> Self { + let db = Arc::new(::kvdb_memorydb::create(crate::utils::NUM_COLUMNS)); + Self::new(db as _) + } + + /// Create offchain local storage with given `KeyValueDB` backend. + pub fn new(db: Arc) -> Self { + Self { + db, + locks: Default::default(), + } + } +} + +impl client::backend::OffchainStorage for LocalStorage { + fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) { + let key: Vec = prefix.iter().chain(key).cloned().collect(); + let mut tx = self.db.transaction(); + tx.put(columns::OFFCHAIN, &key, value); + + if let Err(e) = self.db.write(tx) { + log::warn!("Error writing to the offchain DB: {:?}", e); + } + } + + fn get(&self, prefix: &[u8], key: &[u8]) -> Option> { + let key: Vec = prefix.iter().chain(key).cloned().collect(); + self.db.get(columns::OFFCHAIN, &key) + .ok() + .and_then(|x| x) + .map(|v| v.to_vec()) + } + + fn compare_and_set( + &mut self, + prefix: &[u8], + item_key: &[u8], + old_value: &[u8], + new_value: &[u8], + ) -> bool { + let key: Vec = prefix.iter().chain(item_key).cloned().collect(); + let key_lock = { + let mut locks = self.locks.lock(); + locks.entry(key.clone()).or_default().clone() + }; + + let is_set; + { + let _key_guard = key_lock.lock(); + is_set = self.db.get(columns::OFFCHAIN, &key) + .ok() + .and_then(|x| x) + .map(|v| &*v == old_value) + .unwrap_or(true); + + if is_set { + self.set(prefix, item_key, new_value) + } + } + + // clean the lock map if we're the only entry + let mut locks = self.locks.lock(); + { + drop(key_lock); + let key_lock = locks.get_mut(&key); + if let Some(_) = key_lock.and_then(Arc::get_mut) { + locks.remove(&key); + } + } + is_set + } +} + +#[cfg(test)] +mod tests { + use super::*; + use client::backend::OffchainStorage; + + #[test] + fn should_compare_and_set_and_clear_the_locks_map() { + let mut storage = LocalStorage::new_test(); + let prefix = b"prefix"; + let key = b"key"; + let value = b"value"; + + storage.set(prefix, key, value); + assert_eq!(storage.get(prefix, key), Some(value.to_vec())); + + assert_eq!(storage.compare_and_set(prefix, key, value, b"asd"), true); + assert_eq!(storage.get(prefix, key), Some(b"asd".to_vec())); + assert!(storage.locks.lock().is_empty(), "Locks map should be empty!"); + } +} diff --git a/substrate/core/client/db/src/utils.rs b/substrate/core/client/db/src/utils.rs index a4ab82b5d8..39862dba85 100644 --- a/substrate/core/client/db/src/utils.rs +++ b/substrate/core/client/db/src/utils.rs @@ -17,7 +17,9 @@ //! Db-based backend utility structures and functions, used by both //! full and light storages. -use std::{io, convert::TryInto, sync::Arc}; +#[cfg(feature = "kvdb-rocksdb")] +use std::sync::Arc; +use std::{io, convert::TryInto}; use kvdb::{KeyValueDB, DBTransaction}; #[cfg(feature = "kvdb-rocksdb")] @@ -32,11 +34,12 @@ use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, Zero, UniqueSaturatedFrom, UniqueSaturatedInto, CheckedConversion }; +#[cfg(feature = "kvdb-rocksdb")] use crate::DatabaseSettings; /// Number of columns in the db. Must be the same for both full && light dbs. /// Otherwise RocksDb will fail to open database && check its type. -pub const NUM_COLUMNS: u32 = 9; +pub const NUM_COLUMNS: u32 = 10; /// Meta column. The set of keys in the column is shared by full && light storages. pub const COLUMN_META: Option = Some(0); diff --git a/substrate/core/client/src/backend.rs b/substrate/core/client/src/backend.rs index 8860f61c47..110b7b4d44 100644 --- a/substrate/core/client/src/backend.rs +++ b/substrate/core/client/src/backend.rs @@ -139,6 +139,8 @@ pub trait Backend: AuxStore + Send + Sync where type State: StateBackend; /// Changes trie storage. type ChangesTrieStorage: PrunableStateChangesTrieStorage; + /// Offchain workers local storage. + type OffchainStorage: OffchainStorage; /// Begin a new block insertion transaction with given parent block id. /// When constructing the genesis, this is called with all-zero hash. @@ -156,6 +158,8 @@ pub trait Backend: AuxStore + Send + Sync where fn used_state_cache_size(&self) -> Option; /// Returns reference to changes trie storage. fn changes_trie_storage(&self) -> Option<&Self::ChangesTrieStorage>; + /// Returns a handle to offchain storage. + fn offchain_storage(&self) -> Option; /// Returns true if state for given block is available. fn have_state_at(&self, hash: &Block::Hash, _number: NumberFor) -> bool { self.state_at(BlockId::Hash(hash.clone())).is_ok() @@ -194,6 +198,26 @@ pub trait Backend: AuxStore + Send + Sync where fn get_import_lock(&self) -> &Mutex<()>; } +/// Offchain workers local storage. +pub trait OffchainStorage: Clone + Send + Sync { + /// Persist a value in storage under given key and prefix. + fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]); + + /// Retrieve a value from storage under given key and prefix. + fn get(&self, prefix: &[u8], key: &[u8]) -> Option>; + + /// Replace the value in storage if given old_value matches the current one. + /// + /// Returns `true` if the value has been set and false otherwise. + fn compare_and_set( + &mut self, + prefix: &[u8], + key: &[u8], + old_value: &[u8], + new_value: &[u8], + ) -> bool; +} + /// Changes trie storage that supports pruning. pub trait PrunableStateChangesTrieStorage: StateChangesTrieStorage> diff --git a/substrate/core/client/src/in_mem.rs b/substrate/core/client/src/in_mem.rs index d0283147fa..b6546a67bb 100644 --- a/substrate/core/client/src/in_mem.rs +++ b/substrate/core/client/src/in_mem.rs @@ -593,6 +593,7 @@ where type Blockchain = Blockchain; type State = InMemory; type ChangesTrieStorage = ChangesTrieStorage; + type OffchainStorage = OffchainStorage; fn begin_operation(&self) -> error::Result { let old_state = self.state_at(BlockId::Hash(Default::default()))?; @@ -669,6 +670,10 @@ where Some(&self.changes_trie_storage) } + fn offchain_storage(&self) -> Option { + None + } + fn state_at(&self, block: BlockId) -> error::Result { match block { BlockId::Hash(h) if h == Default::default() => { @@ -768,8 +773,46 @@ pub fn check_genesis_storage(top: &StorageOverlay, children: &ChildrenStorageOve Ok(()) } +/// In-memory storage for offchain workers. +#[derive(Debug, Clone, Default)] +pub struct OffchainStorage { + storage: HashMap, Vec>, +} + +impl backend::OffchainStorage for OffchainStorage { + fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) { + let key = prefix.iter().chain(key).cloned().collect(); + self.storage.insert(key, value.to_vec()); + } + + fn get(&self, prefix: &[u8], key: &[u8]) -> Option> { + let key: Vec = prefix.iter().chain(key).cloned().collect(); + self.storage.get(&key).cloned() + } + + fn compare_and_set( + &mut self, + prefix: &[u8], + key: &[u8], + old_value: &[u8], + new_value: &[u8], + ) -> bool { + let key = prefix.iter().chain(key).cloned().collect(); + + let mut set = false; + self.storage.entry(key).and_modify(|val| { + if val.as_slice() == old_value { + *val = new_value.to_vec(); + set = true; + } + }); + set + } +} + #[cfg(test)] mod tests { + use super::*; use std::sync::Arc; use test_client; use primitives::Blake2Hasher; @@ -789,4 +832,22 @@ mod tests { test_client::trait_tests::test_blockchain_query_by_number_gets_canonical(backend); } + + #[test] + fn in_memory_offchain_storage() { + use crate::backend::OffchainStorage as _; + + let mut storage = OffchainStorage::default(); + assert_eq!(storage.get(b"A", b"B"), None); + assert_eq!(storage.get(b"B", b"A"), None); + + storage.set(b"A", b"B", b"C"); + assert_eq!(storage.get(b"A", b"B"), Some(b"C".to_vec())); + assert_eq!(storage.get(b"B", b"A"), None); + + storage.compare_and_set(b"A", b"B", b"X", b"D"); + assert_eq!(storage.get(b"A", b"B"), Some(b"C".to_vec())); + storage.compare_and_set(b"A", b"B", b"C", b"D"); + assert_eq!(storage.get(b"A", b"B"), Some(b"D".to_vec())); + } } diff --git a/substrate/core/client/src/light/backend.rs b/substrate/core/client/src/light/backend.rs index 1104518811..96c0c07f67 100644 --- a/substrate/core/client/src/light/backend.rs +++ b/substrate/core/client/src/light/backend.rs @@ -118,6 +118,7 @@ impl ClientBackend for Backend where type Blockchain = Blockchain; type State = OnDemandOrGenesisState; type ChangesTrieStorage = in_mem::ChangesTrieStorage; + type OffchainStorage = in_mem::OffchainStorage; fn begin_operation(&self) -> ClientResult { Ok(ImportOperation { @@ -195,6 +196,10 @@ impl ClientBackend for Backend where None } + fn offchain_storage(&self) -> Option { + None + } + fn state_at(&self, block: BlockId) -> ClientResult { let block_number = self.blockchain.expect_block_number_from_id(&block)?; diff --git a/substrate/core/executor/Cargo.toml b/substrate/core/executor/Cargo.toml index 805e1b6ad0..c3736a3063 100644 --- a/substrate/core/executor/Cargo.toml +++ b/substrate/core/executor/Cargo.toml @@ -26,6 +26,8 @@ tiny-keccak = "1.4.2" assert_matches = "1.1" wabt = "~0.7.4" hex-literal = "0.2.0" +substrate-client = { path = "../client" } +substrate-offchain = { path = "../offchain/" } [features] default = [] diff --git a/substrate/core/executor/src/wasm_executor.rs b/substrate/core/executor/src/wasm_executor.rs index cbb47195de..153760ee78 100644 --- a/substrate/core/executor/src/wasm_executor.rs +++ b/substrate/core/executor/src/wasm_executor.rs @@ -857,24 +857,28 @@ impl_function_executor!(this: FunctionExecutor<'e, E>, .map_err(|_| "Invalid attempt to set value in ext_random_seed")?; Ok(()) }, - ext_local_storage_set(key: *const u8, key_len: u32, value: *const u8, value_len: u32) => { + ext_local_storage_set(kind: u32, key: *const u8, key_len: u32, value: *const u8, value_len: u32) => { + let kind = offchain::StorageKind::try_from(kind) + .map_err(|_| "storage kind OOB while ext_local_storage_set: wasm")?; let key = this.memory.get(key, key_len as usize) .map_err(|_| "OOB while ext_local_storage_set: wasm")?; let value = this.memory.get(value, value_len as usize) .map_err(|_| "OOB while ext_local_storage_set: wasm")?; this.ext.offchain() - .map(|api| api.local_storage_set(&key, &value)) + .map(|api| api.local_storage_set(kind, &key, &value)) .ok_or_else(|| "Calling unavailable API ext_local_storage_set: wasm")?; Ok(()) }, - ext_local_storage_get(key: *const u8, key_len: u32, value_len: *mut u32) -> *mut u8 => { + ext_local_storage_get(kind: u32, key: *const u8, key_len: u32, value_len: *mut u32) -> *mut u8 => { + let kind = offchain::StorageKind::try_from(kind) + .map_err(|_| "storage kind OOB while ext_local_storage_get: wasm")?; let key = this.memory.get(key, key_len as usize) .map_err(|_| "OOB while ext_local_storage_get: wasm")?; let maybe_value = this.ext.offchain() - .map(|api| api.local_storage_get(&key)) + .map(|api| api.local_storage_get(kind, &key)) .ok_or_else(|| "Calling unavailable API ext_local_storage_get: wasm")?; let (offset, len) = if let Some(value) = maybe_value { @@ -891,6 +895,31 @@ impl_function_executor!(this: FunctionExecutor<'e, E>, Ok(offset) }, + ext_local_storage_compare_and_set( + kind: u32, + key: *const u8, + key_len: u32, + old_value: *const u8, + old_value_len: u32, + new_value: *const u8, + new_value_len: u32 + ) -> u32 => { + let kind = offchain::StorageKind::try_from(kind) + .map_err(|_| "storage kind OOB while ext_local_storage_compare_and_set: wasm")?; + let key = this.memory.get(key, key_len as usize) + .map_err(|_| "OOB while ext_local_storage_compare_and_set: wasm")?; + let old_value = this.memory.get(old_value, old_value_len as usize) + .map_err(|_| "OOB while ext_local_storage_compare_and_set: wasm")?; + let new_value = this.memory.get(new_value, new_value_len as usize) + .map_err(|_| "OOB while ext_local_storage_compare_and_set: wasm")?; + + let res = this.ext.offchain() + .map(|api| api.local_storage_compare_and_set(kind, &key, &old_value, &new_value)) + .ok_or_else(|| "Calling unavailable API ext_local_storage_compare_andset: wasm")?; + + Ok(if res { 0 } else { 1 }) + + }, ext_http_request_start( method: *const u8, method_len: u32, @@ -1365,6 +1394,7 @@ mod tests { use state_machine::TestExternalities as CoreTestExternalities; use hex_literal::hex; use primitives::map; + use substrate_offchain::testing; type TestExternalities = CoreTestExternalities; @@ -1550,4 +1580,45 @@ mod tests { ordered_trie_root::(vec![b"zero".to_vec(), b"one".to_vec(), b"two".to_vec()].iter()).as_fixed_bytes().encode() ); } + + #[test] + fn offchain_local_storage_should_work() { + use substrate_client::backend::OffchainStorage; + + let mut ext = TestExternalities::::default(); + let (offchain, state) = testing::TestOffchainExt::new(); + ext.set_offchain_externalities(offchain); + let test_code = include_bytes!("../wasm/target/wasm32-unknown-unknown/release/runtime_test.compact.wasm"); + assert_eq!( + WasmExecutor::new().call(&mut ext, 8, &test_code[..], "test_offchain_local_storage", &[]).unwrap(), + vec![0] + ); + assert_eq!(state.read().persistent_storage.get(b"", b"test"), Some(vec![])); + } + + #[test] + fn offchain_http_should_work() { + let mut ext = TestExternalities::::default(); + let (offchain, state) = testing::TestOffchainExt::new(); + ext.set_offchain_externalities(offchain); + state.write().expect_request( + 0, + testing::PendingRequest { + method: "POST".into(), + uri: "http://localhost:12345".into(), + body: vec![1, 2, 3, 4], + headers: vec![("X-Auth".to_owned(), "test".to_owned())], + sent: true, + response: vec![1, 2, 3], + response_headers: vec![("X-Auth".to_owned(), "hello".to_owned())], + ..Default::default() + }, + ); + + let test_code = include_bytes!("../wasm/target/wasm32-unknown-unknown/release/runtime_test.compact.wasm"); + assert_eq!( + WasmExecutor::new().call(&mut ext, 8, &test_code[..], "test_offchain_http", &[]).unwrap(), + vec![0] + ); + } } diff --git a/substrate/core/executor/wasm/Cargo.lock b/substrate/core/executor/wasm/Cargo.lock index 26ea4bdecf..e9665f89e7 100644 --- a/substrate/core/executor/wasm/Cargo.lock +++ b/substrate/core/executor/wasm/Cargo.lock @@ -125,6 +125,7 @@ version = "2.0.0" dependencies = [ "sr-io 2.0.0", "sr-sandbox 2.0.0", + "sr-std 2.0.0", "substrate-primitives 2.0.0", ] diff --git a/substrate/core/executor/wasm/Cargo.toml b/substrate/core/executor/wasm/Cargo.toml index e9f829746d..f703dcb3d0 100644 --- a/substrate/core/executor/wasm/Cargo.toml +++ b/substrate/core/executor/wasm/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" crate-type = ["cdylib"] [dependencies] +rstd = { package = "sr-std", path = "../../sr-std", default-features = false } runtime_io = { package = "sr-io", path = "../../sr-io", default-features = false } sandbox = { package = "sr-sandbox", path = "../../sr-sandbox", default-features = false } substrate-primitives = { path = "../../primitives", default-features = false } diff --git a/substrate/core/executor/wasm/src/lib.rs b/substrate/core/executor/wasm/src/lib.rs index 41f071ca9f..6dbfe67ff1 100644 --- a/substrate/core/executor/wasm/src/lib.rs +++ b/substrate/core/executor/wasm/src/lib.rs @@ -1,9 +1,7 @@ #![no_std] #![cfg_attr(feature = "strict", deny(warnings))] -extern crate alloc; -use alloc::vec::Vec; -use alloc::slice; +use rstd::{slice, vec::Vec, vec}; use runtime_io::{ set_storage, storage, clear_prefix, print, blake2_128, blake2_256, @@ -11,7 +9,7 @@ use runtime_io::{ }; macro_rules! impl_stubs { - ( $( $new_name:ident => $invoke:expr ),* ) => { + ( $( $new_name:ident => $invoke:expr, )* ) => { $( impl_stubs!(@METHOD $new_name => $invoke); )* @@ -134,7 +132,42 @@ impl_stubs!( Err(sandbox::Error::OutOfBounds) => 3, }; [code].to_vec() - } + }, + test_offchain_local_storage => |_| { + let kind = substrate_primitives::offchain::StorageKind::PERSISTENT; + assert_eq!(runtime_io::local_storage_get(kind, b"test"), None); + runtime_io::local_storage_set(kind, b"test", b"asd"); + assert_eq!(runtime_io::local_storage_get(kind, b"test"), Some(b"asd".to_vec())); + + let res = runtime_io::local_storage_compare_and_set(kind, b"test", b"asd", b""); + assert_eq!(res, true); + assert_eq!(runtime_io::local_storage_get(kind, b"test"), Some(b"".to_vec())); + + [0].to_vec() + }, + test_offchain_http => |_| { + use substrate_primitives::offchain::HttpRequestStatus; + let run = || -> Option<()> { + let id = runtime_io::http_request_start("POST", "http://localhost:12345", &[]).ok()?; + runtime_io::http_request_add_header(id, "X-Auth", "test").ok()?; + runtime_io::http_request_write_body(id, &[1, 2, 3, 4], None).ok()?; + runtime_io::http_request_write_body(id, &[], None).ok()?; + let status = runtime_io::http_response_wait(&[id], None); + assert!(status == vec![HttpRequestStatus::Finished(200)], "Expected Finished(200) status."); + let headers = runtime_io::http_response_headers(id); + assert_eq!(headers, vec![(b"X-Auth".to_vec(), b"hello".to_vec())]); + let mut buffer = vec![0; 64]; + let read = runtime_io::http_response_read_body(id, &mut buffer, None).ok()?; + assert_eq!(read, 3); + assert_eq!(&buffer[0..read], &[1, 2, 3]); + let read = runtime_io::http_response_read_body(id, &mut buffer, None).ok()?; + assert_eq!(read, 0); + + Some(()) + }; + + [if run().is_some() { 0 } else { 1 }].to_vec() + }, ); fn execute_sandboxed(code: &[u8], args: &[sandbox::TypedValue]) -> Result { diff --git a/substrate/core/offchain/Cargo.toml b/substrate/core/offchain/Cargo.toml index c272653b64..758865b49c 100644 --- a/substrate/core/offchain/Cargo.toml +++ b/substrate/core/offchain/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" [dependencies] client = { package = "substrate-client", path = "../../core/client" } -consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common" } futures = "0.1.25" log = "0.4" offchain-primitives = { package = "substrate-offchain-primitives", path = "./primitives" } @@ -20,6 +19,7 @@ transaction_pool = { package = "substrate-transaction-pool", path = "../../core/ [dev-dependencies] env_logger = "0.6" +client-db = { package = "substrate-client-db", path = "../../core/client/db/", default-features = true } test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } tokio = "0.1.7" diff --git a/substrate/core/offchain/src/api.rs b/substrate/core/offchain/src/api.rs index d2c7630c24..bb5db3daff 100644 --- a/substrate/core/offchain/src/api.rs +++ b/substrate/core/offchain/src/api.rs @@ -15,6 +15,7 @@ // along with Substrate. If not, see . use std::sync::Arc; +use client::backend::OffchainStorage; use futures::{Stream, Future, sync::mpsc}; use log::{info, debug, warn, error}; use parity_codec::Decode; @@ -22,6 +23,7 @@ use primitives::offchain::{ Timestamp, HttpRequestId, HttpRequestStatus, HttpError, Externalities as OffchainExt, CryptoKind, CryptoKeyId, + StorageKind, }; use runtime_primitives::{ generic::BlockId, @@ -37,17 +39,24 @@ enum ExtMessage { /// Asynchronous offchain API. /// /// NOTE this is done to prevent recursive calls into the runtime (which are not supported currently). -pub(crate) struct AsyncApi(mpsc::UnboundedSender); +pub(crate) struct Api { + sender: mpsc::UnboundedSender, + db: S, +} fn unavailable_yet(name: &str) -> R { - error!("This {:?} API is not available for offchain workers yet. Follow + error!("The {:?} API is not available for offchain workers yet. Follow \ https://github.com/paritytech/substrate/issues/1458 for details", name); Default::default() } -impl OffchainExt for AsyncApi { +const LOCAL_DB: &str = "LOCAL (fork-aware) DB"; +const STORAGE_PREFIX: &[u8] = b"storage"; + +impl OffchainExt for Api { fn submit_transaction(&mut self, ext: Vec) -> Result<(), ()> { - self.0.unbounded_send(ExtMessage::SubmitExtrinsic(ext)) + self.sender + .unbounded_send(ExtMessage::SubmitExtrinsic(ext)) .map(|_| ()) .map_err(|_| ()) } @@ -89,16 +98,33 @@ impl OffchainExt for AsyncApi { unavailable_yet("random_seed") } - fn local_storage_set(&mut self, _key: &[u8], _value: &[u8]) { - unavailable_yet("local_storage_set") + fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) { + match kind { + StorageKind::PERSISTENT => self.db.set(STORAGE_PREFIX, key, value), + StorageKind::LOCAL => unavailable_yet(LOCAL_DB), + } } - fn local_storage_compare_and_set(&mut self, _key: &[u8], _old_value: &[u8], _new_value: &[u8]) { - unavailable_yet("local_storage_compare_and_set") + fn local_storage_compare_and_set( + &mut self, + kind: StorageKind, + key: &[u8], + old_value: &[u8], + new_value: &[u8], + ) -> bool { + match kind { + StorageKind::PERSISTENT => { + self.db.compare_and_set(STORAGE_PREFIX, key, old_value, new_value) + }, + StorageKind::LOCAL => unavailable_yet(LOCAL_DB), + } } - fn local_storage_get(&mut self, _key: &[u8]) -> Option> { - unavailable_yet("local_storage_get") + fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option> { + match kind { + StorageKind::PERSISTENT => self.db.get(STORAGE_PREFIX, key), + StorageKind::LOCAL => unavailable_yet(LOCAL_DB), + } } fn http_request_start( @@ -159,24 +185,35 @@ impl OffchainExt for AsyncApi { } /// Offchain extensions implementation API -pub(crate) struct Api { +/// +/// This is the asynchronous processing part of the API. +pub(crate) struct AsyncApi { receiver: Option>, transaction_pool: Arc>, at: BlockId, } -impl Api { - pub fn new( +impl AsyncApi { + /// Creates new Offchain extensions API implementation an the asynchronous processing part. + pub fn new( transaction_pool: Arc>, + db: S, at: BlockId, - ) -> (AsyncApi, Self) { - let (tx, rx) = mpsc::unbounded(); - let api = Self { + ) -> (Api, AsyncApi) { + let (sender, rx) = mpsc::unbounded(); + + let api = Api { + sender, + db, + }; + + let async_api = AsyncApi { receiver: Some(rx), transaction_pool, at, }; - (AsyncApi(tx), api) + + (api, async_api) } /// Run a processing task for the API @@ -209,3 +246,52 @@ impl Api { } } } + +#[cfg(test)] +mod tests { + use super::*; + use client_db::offchain::LocalStorage; + + fn offchain_api() -> (Api, AsyncApi) { + let _ = env_logger::try_init(); + let db = LocalStorage::new_test(); + let client = Arc::new(test_client::new()); + let pool = Arc::new( + Pool::new(Default::default(), transaction_pool::ChainApi::new(client.clone())) + ); + + AsyncApi::new(pool, db, BlockId::Number(0)) + } + + #[test] + fn should_set_and_get_local_storage() { + // given + let kind = StorageKind::PERSISTENT; + let mut api = offchain_api().0; + let key = b"test"; + + // when + assert_eq!(api.local_storage_get(kind, key), None); + api.local_storage_set(kind, key, b"value"); + + // then + assert_eq!(api.local_storage_get(kind, key), Some(b"value".to_vec())); + } + + #[test] + fn should_compare_and_set_local_storage() { + // given + let kind = StorageKind::PERSISTENT; + let mut api = offchain_api().0; + let key = b"test"; + api.local_storage_set(kind, key, b"value"); + + // when + assert_eq!(api.local_storage_compare_and_set(kind, key, b"val", b"xxx"), false); + assert_eq!(api.local_storage_get(kind, key), Some(b"value".to_vec())); + + // when + assert_eq!(api.local_storage_compare_and_set(kind, key, b"value", b"xxx"), true); + assert_eq!(api.local_storage_get(kind, key), Some(b"xxx".to_vec())); + } +} diff --git a/substrate/core/offchain/src/lib.rs b/substrate/core/offchain/src/lib.rs index 96c0a19077..88676e6ec8 100644 --- a/substrate/core/offchain/src/lib.rs +++ b/substrate/core/offchain/src/lib.rs @@ -56,31 +56,35 @@ pub mod testing; pub use offchain_primitives::OffchainWorkerApi; /// An offchain workers manager. -pub struct OffchainWorkers { +pub struct OffchainWorkers { client: Arc, + db: S, _block: PhantomData, } -impl fmt::Debug for OffchainWorkers { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("OffchainWorkers").finish() - } -} - -impl OffchainWorkers { +impl OffchainWorkers { /// Creates new `OffchainWorkers`. pub fn new( client: Arc, + db: S, ) -> Self { Self { client, + db, _block: PhantomData, } } } -impl OffchainWorkers where +impl fmt::Debug for OffchainWorkers { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("OffchainWorkers").finish() + } +} + +impl OffchainWorkers where Block: traits::Block, + S: client::backend::OffchainStorage + 'static, C: ProvideRuntimeApi, C::Api: OffchainWorkerApi, { @@ -99,7 +103,11 @@ impl OffchainWorkers where debug!("Checking offchain workers at {:?}: {:?}", at, has_api); if has_api.unwrap_or(false) { - let (api, runner) = api::Api::new(pool.clone(), at.clone()); + let (api, runner) = api::AsyncApi::new( + pool.clone(), + self.db.clone(), + at.clone(), + ); debug!("Running offchain workers at {:?}", at); let api = Box::new(api); runtime.offchain_worker_with_context(&at, ExecutionContext::OffchainWorker(api), *number).unwrap(); @@ -122,9 +130,10 @@ mod tests { let runtime = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let pool = Arc::new(Pool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()))); + let db = client_db::offchain::LocalStorage::new_test(); // when - let offchain = OffchainWorkers::new(client); + let offchain = OffchainWorkers::new(client, db); runtime.executor().spawn(offchain.on_block_imported(&0u64, &pool)); // then diff --git a/substrate/core/offchain/src/testing.rs b/substrate/core/offchain/src/testing.rs index 3419665d0a..15e1a01ecb 100644 --- a/substrate/core/offchain/src/testing.rs +++ b/substrate/core/offchain/src/testing.rs @@ -20,6 +20,7 @@ use std::{ collections::BTreeMap, sync::Arc, }; +use client::backend::OffchainStorage; use parking_lot::RwLock; use primitives::offchain::{ self, @@ -29,6 +30,7 @@ use primitives::offchain::{ Timestamp, CryptoKind, CryptoKeyId, + StorageKind, }; /// Pending request. @@ -61,6 +63,11 @@ pub struct PendingRequest { pub struct State { /// A list of pending requests. pub requests: BTreeMap, + expected_requests: BTreeMap, + /// Persistent local storage + pub persistent_storage: client::in_mem::OffchainStorage, + /// Local storage + pub local_storage: client::in_mem::OffchainStorage, } impl State { @@ -74,7 +81,7 @@ impl State { ) { match self.requests.get_mut(&RequestId(id)) { None => { - panic!("Missing expected request: {:?}.\n\nAll: {:?}", id, self.requests); + panic!("Missing pending request: {:?}.\n\nAll: {:?}", id, self.requests); } Some(req) => { assert_eq!( @@ -86,12 +93,47 @@ impl State { } } } + + fn fulfill_expected(&mut self, id: u16) { + if let Some(mut req) = self.expected_requests.remove(&RequestId(id)) { + let response = std::mem::replace(&mut req.response, vec![]); + let headers = std::mem::replace(&mut req.response_headers, vec![]); + self.fulfill_pending_request(id, req, response, headers); + } + } + + /// Add expected HTTP request. + /// + /// This method can be used to initialize expected HTTP requests and their responses + /// before running the actual code that utilizes them (for instance before calling into runtime). + /// Expected request has to be fulfilled before this struct is dropped, + /// the `response` and `response_headers` fields will be used to return results to the callers. + pub fn expect_request(&mut self, id: u16, expected: PendingRequest) { + self.expected_requests.insert(RequestId(id), expected); + } +} + +impl Drop for State { + fn drop(&mut self) { + if !self.expected_requests.is_empty() { + panic!("Unfulfilled expected requests: {:?}", self.expected_requests); + } + } } /// Implementation of offchain externalities used for tests. #[derive(Clone, Default, Debug)] pub struct TestOffchainExt(pub Arc>); +impl TestOffchainExt { + /// Create new `TestOffchainExt` and a reference to the internal state. + pub fn new() -> (Self, Arc>) { + let ext = Self::default(); + let state = ext.0.clone(); + (ext, state) + } +} + impl offchain::Externalities for TestOffchainExt { fn submit_transaction(&mut self, _ex: Vec) -> Result<(), ()> { unimplemented!("not needed in tests so far") @@ -129,21 +171,34 @@ impl offchain::Externalities for TestOffchainExt { unimplemented!("not needed in tests so far") } - fn local_storage_set(&mut self, _key: &[u8], _value: &[u8]) { - unimplemented!("not needed in tests so far") + fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) { + let mut state = self.0.write(); + match kind { + StorageKind::LOCAL => &mut state.local_storage, + StorageKind::PERSISTENT => &mut state.persistent_storage, + }.set(b"", key, value); } fn local_storage_compare_and_set( &mut self, - _key: &[u8], - _old_value: &[u8], - _new_value: &[u8] - ) { - unimplemented!("not needed in tests so far") + kind: StorageKind, + key: &[u8], + old_value: &[u8], + new_value: &[u8] + ) -> bool { + let mut state = self.0.write(); + match kind { + StorageKind::LOCAL => &mut state.local_storage, + StorageKind::PERSISTENT => &mut state.persistent_storage, + }.compare_and_set(b"", key, old_value, new_value) } - fn local_storage_get(&mut self, _key: &[u8]) -> Option> { - unimplemented!("not needed in tests so far") + fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option> { + let state = self.0.read(); + match kind { + StorageKind::LOCAL => &state.local_storage, + StorageKind::PERSISTENT => &state.persistent_storage, + }.get(b"", key) } fn http_request_start(&mut self, method: &str, uri: &str, meta: &[u8]) -> Result { @@ -180,15 +235,21 @@ impl offchain::Externalities for TestOffchainExt { _deadline: Option ) -> Result<(), HttpError> { let mut state = self.0.write(); - if let Some(req) = state.requests.get_mut(&request_id) { + + let sent = { + let req = state.requests.get_mut(&request_id).ok_or(HttpError::IoError)?; + req.body.extend(chunk); if chunk.is_empty() { req.sent = true; } - req.body.extend(chunk); - Ok(()) - } else { - Err(HttpError::IoError) + req.sent + }; + + if sent { + state.fulfill_expected(request_id.0); } + + Ok(()) } fn http_response_wait( diff --git a/substrate/core/primitives/src/offchain.rs b/substrate/core/primitives/src/offchain.rs index 7d54c9d61e..7c068ca886 100644 --- a/substrate/core/primitives/src/offchain.rs +++ b/substrate/core/primitives/src/offchain.rs @@ -19,6 +19,37 @@ use rstd::prelude::{Vec, Box}; use rstd::convert::TryFrom; +/// A type of supported crypto. +#[derive(Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "std", derive(Debug))] +#[repr(C)] +pub enum StorageKind { + /// Persistent storage is non-revertible and not fork-aware. It means that any value + /// set by the offchain worker triggered at block `N(hash1)` is persisted even + /// if that block is reverted as non-canonical and is available for the worker + /// that is re-run at block `N(hash2)`. + /// This storage can be used by offchain workers to handle forks + /// and coordinate offchain workers running on different forks. + PERSISTENT = 1, + /// Local storage is revertible and fork-aware. It means that any value + /// set by the offchain worker triggered at block `N(hash1)` is reverted + /// if that block is reverted as non-canonical and is NOT available for the worker + /// that is re-run at block `N(hash2)`. + LOCAL = 2, +} + +impl TryFrom for StorageKind { + type Error = (); + + fn try_from(kind: u32) -> Result { + match kind { + e if e == u32::from(StorageKind::PERSISTENT as u8) => Ok(StorageKind::PERSISTENT), + e if e == u32::from(StorageKind::LOCAL as u8) => Ok(StorageKind::LOCAL), + _ => Err(()), + } + } +} + /// A type of supported crypto. #[derive(Clone, Copy, PartialEq, Eq)] #[cfg_attr(feature = "std", derive(Debug))] @@ -37,7 +68,7 @@ impl TryFrom for CryptoKind { match kind { e if e == u32::from(CryptoKind::Sr25519 as u8) => Ok(CryptoKind::Sr25519), e if e == u32::from(CryptoKind::Ed25519 as u8) => Ok(CryptoKind::Ed25519), - _ => Err(()) + _ => Err(()), } } } @@ -227,23 +258,31 @@ pub trait Externalities { /// /// Note this storage is not part of the consensus, it's only accessible by /// offchain worker tasks running on the same machine. It IS persisted between runs. - fn local_storage_set(&mut self, key: &[u8], value: &[u8]); + fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]); /// Sets a value in the local storage if it matches current value. /// /// Since multiple offchain workers may be running concurrently, to prevent /// data races use CAS to coordinate between them. /// + /// Returns `true` if the value has been set, `false` otherwise. + /// /// Note this storage is not part of the consensus, it's only accessible by /// offchain worker tasks running on the same machine. It IS persisted between runs. - fn local_storage_compare_and_set(&mut self, key: &[u8], old_value: &[u8], new_value: &[u8]); + fn local_storage_compare_and_set( + &mut self, + kind: StorageKind, + key: &[u8], + old_value: &[u8], + new_value: &[u8], + ) -> bool; /// Gets a value from the local storage. /// /// If the value does not exist in the storage `None` will be returned. /// Note this storage is not part of the consensus, it's only accessible by /// offchain worker tasks running on the same machine. It IS persisted between runs. - fn local_storage_get(&mut self, key: &[u8]) -> Option>; + fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option>; /// Initiaties a http request given HTTP verb and the URL. /// @@ -348,16 +387,22 @@ impl Externalities for Box { (&mut **self).random_seed() } - fn local_storage_set(&mut self, key: &[u8], value: &[u8]) { - (&mut **self).local_storage_set(key, value) + fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) { + (&mut **self).local_storage_set(kind, key, value) } - fn local_storage_compare_and_set(&mut self, key: &[u8], old_value: &[u8], new_value: &[u8]) { - (&mut **self).local_storage_compare_and_set(key, old_value, new_value) + fn local_storage_compare_and_set( + &mut self, + kind: StorageKind, + key: &[u8], + old_value: &[u8], + new_value: &[u8], + ) -> bool { + (&mut **self).local_storage_compare_and_set(kind, key, old_value, new_value) } - fn local_storage_get(&mut self, key: &[u8]) -> Option> { - (&mut **self).local_storage_get(key) + fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option> { + (&mut **self).local_storage_get(kind, key) } fn http_request_start(&mut self, method: &str, uri: &str, meta: &[u8]) -> Result { diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index e34da42bcf..0c55df67dc 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -121,6 +121,11 @@ pub type ComponentClient = Client< ::RuntimeApi, >; +/// A offchain workers storage backend type. +pub type ComponentOffchainStorage = < + ::Backend as client::backend::Backend, Blake2Hasher> +>::OffchainStorage; + /// Block type for `Components` pub type ComponentBlock = <::Factory as ServiceFactory>::Block; @@ -259,7 +264,11 @@ impl MaintainTransactionPool for C where pub trait OffchainWorker { fn offchain_workers( number: &FactoryBlockNumber, - offchain: &offchain::OffchainWorkers, ComponentBlock>, + offchain: &offchain::OffchainWorkers< + ComponentClient, + ComponentOffchainStorage, + ComponentBlock + >, pool: &Arc>, ) -> error::Result + Send>>; } @@ -270,7 +279,11 @@ impl OffchainWorker for C where { fn offchain_workers( number: &FactoryBlockNumber, - offchain: &offchain::OffchainWorkers, ComponentBlock>, + offchain: &offchain::OffchainWorkers< + ComponentClient, + ComponentOffchainStorage, + ComponentBlock + >, pool: &Arc>, ) -> error::Result + Send>> { Ok(Box::new(offchain.on_block_imported(number, pool))) diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index 3cadd1d406..cff77059fe 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -32,7 +32,7 @@ use std::time::Duration; use futures::sync::mpsc; use parking_lot::Mutex; -use client::{BlockchainEvents, backend::Backend}; +use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; use exit_future::Signal; use futures::prelude::*; use keystore::Store as Keystore; @@ -51,11 +51,10 @@ pub use chain_spec::{ChainSpec, Properties}; pub use transaction_pool::txpool::{ self, Pool as TransactionPool, Options as TransactionPoolOptions, ChainApi, IntoPoolError }; -use client::runtime_api::BlockT; pub use client::FinalityNotifications; pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend, - LightExecutor, Components, PoolApi, ComponentClient, + LightExecutor, Components, PoolApi, ComponentClient, ComponentOffchainStorage, ComponentBlock, FullClient, LightClient, FullComponents, LightComponents, CodeExecutor, NetworkService, FactoryChainSpec, FactoryBlock, FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis, @@ -94,8 +93,12 @@ pub struct Service { pub config: FactoryFullConfiguration, _rpc: Box, _telemetry: Option, - _offchain_workers: Option, ComponentBlock>>>, _telemetry_on_connect_sinks: Arc>>>, + _offchain_workers: Option, + ComponentOffchainStorage, + ComponentBlock> + >>, } /// Creates bare client without any networking. @@ -234,10 +237,20 @@ impl Service { .select(exit.clone()) .then(|_| Ok(())))); - let offchain_workers = if config.offchain_worker { - Some(Arc::new(offchain::OffchainWorkers::new(client.clone()))) - } else { - None + #[allow(deprecated)] + let offchain_storage = client.backend().offchain_storage(); + let offchain_workers = match (config.offchain_worker, offchain_storage) { + (true, Some(db)) => { + Some(Arc::new(offchain::OffchainWorkers::new( + client.clone(), + db, + ))) + }, + (true, None) => { + log::warn!("Offchain workers disabled, due to lack of offchain storage support in backend."); + None + }, + _ => None, }; { diff --git a/substrate/core/sr-io/src/lib.rs b/substrate/core/sr-io/src/lib.rs index cd9b43798b..47df468d20 100644 --- a/substrate/core/sr-io/src/lib.rs +++ b/substrate/core/sr-io/src/lib.rs @@ -33,7 +33,12 @@ use rstd::vec::Vec; pub use codec; pub use primitives::Blake2Hasher; -use primitives::offchain::{Timestamp, HttpRequestId, HttpRequestStatus, HttpError, CryptoKind, CryptoKeyId}; +use primitives::offchain::{ + Timestamp, + HttpRequestId, HttpRequestStatus, HttpError, + CryptoKind, CryptoKeyId, + StorageKind, +}; /// Error verifying ECDSA signature pub enum EcdsaVerifyError { @@ -283,23 +288,25 @@ export_api! { /// /// Note this storage is not part of the consensus, it's only accessible by /// offchain worker tasks running on the same machine. It IS persisted between runs. - fn local_storage_set(key: &[u8], value: &[u8]); + fn local_storage_set(kind: StorageKind, key: &[u8], value: &[u8]); /// Sets a value in the local storage if it matches current value. /// /// Since multiple offchain workers may be running concurrently, to prevent /// data races use CAS to coordinate between them. /// + /// Returns `true` if the value has been set, `false` otherwise. + /// /// Note this storage is not part of the consensus, it's only accessible by /// offchain worker tasks running on the same machine. It IS persisted between runs. - fn local_storage_compare_and_set(key: &[u8], old_value: &[u8], new_value: &[u8]); + fn local_storage_compare_and_set(kind: StorageKind, key: &[u8], old_value: &[u8], new_value: &[u8]) -> bool; /// Gets a value from the local storage. /// /// If the value does not exist in the storage `None` will be returned. /// Note this storage is not part of the consensus, it's only accessible by /// offchain worker tasks running on the same machine. It IS persisted between runs. - fn local_storage_get(key: &[u8]) -> Option>; + fn local_storage_get(kind: StorageKind, key: &[u8]) -> Option>; /// Initiaties a http request given HTTP verb and the URL. /// diff --git a/substrate/core/sr-io/src/offchain/http.rs b/substrate/core/sr-io/src/offchain/http.rs index 0708f83717..6685dd023f 100644 --- a/substrate/core/sr-io/src/offchain/http.rs +++ b/substrate/core/sr-io/src/offchain/http.rs @@ -486,9 +486,8 @@ mod tests { #[test] fn should_send_a_basic_request_and_get_response() { - let offchain = testing::TestOffchainExt::default(); + let (offchain, state) = testing::TestOffchainExt::new(); let mut t = TestExternalities::default(); - let state = offchain.0.clone(); t.set_offchain_externalities(offchain); with_externalities(&mut t, || { @@ -528,9 +527,8 @@ mod tests { #[test] fn should_send_a_post_request() { - let offchain = testing::TestOffchainExt::default(); + let (offchain, state) = testing::TestOffchainExt::new(); let mut t = TestExternalities::default(); - let state = offchain.0.clone(); t.set_offchain_externalities(offchain); with_externalities(&mut t, || { diff --git a/substrate/core/sr-io/with_std.rs b/substrate/core/sr-io/with_std.rs index 34bfc22b9d..aee2dc1bc8 100644 --- a/substrate/core/sr-io/with_std.rs +++ b/substrate/core/sr-io/with_std.rs @@ -305,7 +305,7 @@ impl OffchainApi for () { }, "timestamp can be called only in the offchain worker context") } - fn sleep_until(deadline: Timestamp) { + fn sleep_until(deadline: offchain::Timestamp) { with_offchain(|ext| { ext.sleep_until(deadline) }, "sleep_until can be called only in the offchain worker context") @@ -317,21 +317,26 @@ impl OffchainApi for () { }, "random_seed can be called only in the offchain worker context") } - fn local_storage_set(key: &[u8], value: &[u8]) { + fn local_storage_set(kind: offchain::StorageKind, key: &[u8], value: &[u8]) { with_offchain(|ext| { - ext.local_storage_set(key, value) + ext.local_storage_set(kind, key, value) }, "local_storage_set can be called only in the offchain worker context") } - fn local_storage_compare_and_set(key: &[u8], old_value: &[u8], new_value: &[u8]) { + fn local_storage_compare_and_set( + kind: offchain::StorageKind, + key: &[u8], + old_value: &[u8], + new_value: &[u8], + ) -> bool { with_offchain(|ext| { - ext.local_storage_compare_and_set(key, old_value, new_value) + ext.local_storage_compare_and_set(kind, key, old_value, new_value) }, "local_storage_compare_and_set can be called only in the offchain worker context") } - fn local_storage_get(key: &[u8]) -> Option> { + fn local_storage_get(kind: offchain::StorageKind, key: &[u8]) -> Option> { with_offchain(|ext| { - ext.local_storage_get(key) + ext.local_storage_get(kind, key) }, "local_storage_get can be called only in the offchain worker context") } diff --git a/substrate/core/sr-io/without_std.rs b/substrate/core/sr-io/without_std.rs index b1dc10d1b8..ad1b26b8c1 100644 --- a/substrate/core/sr-io/without_std.rs +++ b/substrate/core/sr-io/without_std.rs @@ -459,17 +459,22 @@ pub mod ext { fn ext_random_seed(data: *mut u8); /// Write a value to local storage. - fn ext_local_storage_set(key: *const u8, key_len: u32, value: *const u8, value_len: u32); + fn ext_local_storage_set(kind: u32, key: *const u8, key_len: u32, value: *const u8, value_len: u32); /// Write a value to local storage in atomic fashion. + /// + /// # Returns + /// - `0` in case the value has been set + /// - `1` if the `old_value` didn't match fn ext_local_storage_compare_and_set( + kind: u32, key: *const u8, key_len: u32, old_value: *const u8, old_value_len: u32, new_value: *const u8, new_value_len: u32 - ); + ) -> u32; /// Read a value from local storage. /// @@ -478,7 +483,7 @@ pub mod ext { /// /// - 0 if the value has not been found, the `value_len` is set to `u32::max_value`. /// - Otherwise, pointer to the value in memory. `value_len` contains the length of the value. - fn ext_local_storage_get(key: *const u8, key_len: u32, value_len: *mut u32) -> *mut u8; + fn ext_local_storage_get(kind: u32, key: *const u8, key_len: u32, value_len: *mut u32) -> *mut u8; /// Initiaties a http request. /// @@ -932,7 +937,7 @@ impl OffchainApi for () { }) } - fn sleep_until(deadline: Timestamp) { + fn sleep_until(deadline: offchain::Timestamp) { unsafe { ext_sleep_until.get()(deadline.unix_millis()) } @@ -946,9 +951,10 @@ impl OffchainApi for () { result } - fn local_storage_set(key: &[u8], value: &[u8]) { + fn local_storage_set(kind: offchain::StorageKind, key: &[u8], value: &[u8]) { unsafe { ext_local_storage_set.get()( + kind as u8 as u32, key.as_ptr(), key.len() as u32, value.as_ptr(), @@ -957,23 +963,25 @@ impl OffchainApi for () { } } - fn local_storage_compare_and_set(key: &[u8], old_value: &[u8], new_value: &[u8]) { + fn local_storage_compare_and_set(kind: offchain::StorageKind, key: &[u8], old_value: &[u8], new_value: &[u8]) -> bool { unsafe { ext_local_storage_compare_and_set.get()( + kind as u8 as u32, key.as_ptr(), key.len() as u32, old_value.as_ptr(), old_value.len() as u32, new_value.as_ptr(), new_value.len() as u32, - ) + ) == 0 } } - fn local_storage_get(key: &[u8]) -> Option> { + fn local_storage_get(kind: offchain::StorageKind, key: &[u8]) -> Option> { let mut len = 0u32; unsafe { let ptr = ext_local_storage_get.get()( + kind as u8 as u32, key.as_ptr(), key.len() as u32, &mut len, diff --git a/substrate/core/state-machine/src/lib.rs b/substrate/core/state-machine/src/lib.rs index 59d3239ff9..7cff304675 100644 --- a/substrate/core/state-machine/src/lib.rs +++ b/substrate/core/state-machine/src/lib.rs @@ -288,15 +288,21 @@ impl offchain::Externalities for NeverOffchainExt { unreachable!() } - fn local_storage_set(&mut self, _key: &[u8], _value: &[u8]) { + fn local_storage_set(&mut self, _kind: offchain::StorageKind, _key: &[u8], _value: &[u8]) { unreachable!() } - fn local_storage_compare_and_set(&mut self, _key: &[u8], _old_value: &[u8], _new_value: &[u8]) { + fn local_storage_compare_and_set( + &mut self, + _kind: offchain::StorageKind, + _key: &[u8], + _old_value: &[u8], + _new_value: &[u8], + ) -> bool { unreachable!() } - fn local_storage_get(&mut self, _key: &[u8]) -> Option> { + fn local_storage_get(&mut self, _kind: offchain::StorageKind, _key: &[u8]) -> Option> { unreachable!() } diff --git a/substrate/node/runtime/src/lib.rs b/substrate/node/runtime/src/lib.rs index 5a43ccfda2..4f360c9580 100644 --- a/substrate/node/runtime/src/lib.rs +++ b/substrate/node/runtime/src/lib.rs @@ -67,7 +67,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // to equal spec_version. If only runtime implementation changes and behavior does not, then // leave spec_version as is and increment impl_version. spec_version: 102, - impl_version: 102, + impl_version: 103, apis: RUNTIME_API_VERSIONS, };