Make offchain indexing work (#7940)

* Make offchain indexing work

This fixes some bugs with offchain indexing to make it actually working ;)

* Fix tests

* Fix browser build

* Update client/db/src/offchain.rs

Co-authored-by: cheme <emericchevalier.pro@gmail.com>

* Remove seperation between prefix and key

Co-authored-by: cheme <emericchevalier.pro@gmail.com>
This commit is contained in:
Bastian Köcher
2021-01-21 13:12:42 +01:00
committed by GitHub
parent a3a9e7667c
commit cd0ad4805d
24 changed files with 188 additions and 306 deletions
+1 -2
View File
@@ -28,7 +28,7 @@ use sp_state_machine::{
};
use sc_executor::{RuntimeVersion, NativeVersion};
use sp_externalities::Extensions;
use sp_core::{NativeOrEncoded,offchain::storage::OffchainOverlayedChanges};
use sp_core::NativeOrEncoded;
use sp_api::{ProofRecorder, InitializeBlock, StorageTransactionCache};
use crate::execution_extensions::ExecutionExtensions;
@@ -86,7 +86,6 @@ pub trait CallExecutor<B: BlockT> {
method: &str,
call_data: &[u8],
changes: &RefCell<OverlayedChanges>,
offchain_changes: &RefCell<OffchainOverlayedChanges>,
storage_transaction_cache: Option<&RefCell<
StorageTransactionCache<B, <Self::Backend as crate::backend::Backend<B>>::State>,
>>,
+5 -7
View File
@@ -681,14 +681,12 @@ pub struct BlockImportOperation<Block: BlockT> {
impl<Block: BlockT> BlockImportOperation<Block> {
fn apply_offchain(&mut self, transaction: &mut Transaction<DbHash>) {
for ((prefix, key), value_operation) in self.offchain_storage_updates.drain() {
let key: Vec<u8> = prefix
.into_iter()
.chain(sp_core::sp_std::iter::once(b'/'))
.chain(key.into_iter())
.collect();
let key = crate::offchain::concatenate_prefix_and_key(&prefix, &key);
match value_operation {
OffchainOverlayedChange::SetValue(val) => transaction.set_from_vec(columns::OFFCHAIN, &key, val),
OffchainOverlayedChange::Remove => transaction.remove(columns::OFFCHAIN, &key),
OffchainOverlayedChange::SetValue(val) =>
transaction.set_from_vec(columns::OFFCHAIN, &key, val),
OffchainOverlayedChange::Remove =>
transaction.remove(columns::OFFCHAIN, &key),
}
}
}
+15 -12
View File
@@ -18,10 +18,7 @@
//! RocksDB-based offchain workers local storage.
use std::{
collections::HashMap,
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};
use crate::{columns, Database, DbHash, Transaction};
use parking_lot::Mutex;
@@ -43,7 +40,7 @@ impl std::fmt::Debug for LocalStorage {
impl LocalStorage {
/// Create new offchain storage for tests (backed by memorydb)
#[cfg(any(test, feature = "test-helpers"))]
#[cfg(any(feature = "test-helpers", test))]
pub fn new_test() -> Self {
let db = kvdb_memorydb::create(crate::utils::NUM_COLUMNS);
let db = sp_database::as_database(db);
@@ -61,9 +58,8 @@ impl LocalStorage {
impl sp_core::offchain::OffchainStorage for LocalStorage {
fn set(&mut self, prefix: &[u8], key: &[u8], value: &[u8]) {
let key: Vec<u8> = prefix.iter().chain(key).cloned().collect();
let mut tx = Transaction::new();
tx.set(columns::OFFCHAIN, &key, value);
tx.set(columns::OFFCHAIN, &concatenate_prefix_and_key(prefix, key), value);
if let Err(err) = self.db.commit(tx) {
error!("Error setting on local storage: {}", err)
@@ -71,9 +67,8 @@ impl sp_core::offchain::OffchainStorage for LocalStorage {
}
fn remove(&mut self, prefix: &[u8], key: &[u8]) {
let key: Vec<u8> = prefix.iter().chain(key).cloned().collect();
let mut tx = Transaction::new();
tx.remove(columns::OFFCHAIN, &key);
tx.remove(columns::OFFCHAIN, &concatenate_prefix_and_key(prefix, key));
if let Err(err) = self.db.commit(tx) {
error!("Error removing on local storage: {}", err)
@@ -81,8 +76,7 @@ impl sp_core::offchain::OffchainStorage for LocalStorage {
}
fn get(&self, prefix: &[u8], key: &[u8]) -> Option<Vec<u8>> {
let key: Vec<u8> = prefix.iter().chain(key).cloned().collect();
self.db.get(columns::OFFCHAIN, &key)
self.db.get(columns::OFFCHAIN, &concatenate_prefix_and_key(prefix, key))
}
fn compare_and_set(
@@ -92,7 +86,7 @@ impl sp_core::offchain::OffchainStorage for LocalStorage {
old_value: Option<&[u8]>,
new_value: &[u8],
) -> bool {
let key: Vec<u8> = prefix.iter().chain(item_key).cloned().collect();
let key = concatenate_prefix_and_key(prefix, item_key);
let key_lock = {
let mut locks = self.locks.lock();
locks.entry(key.clone()).or_default().clone()
@@ -122,6 +116,15 @@ impl sp_core::offchain::OffchainStorage for LocalStorage {
}
}
/// Concatenate the prefix and key to create an offchain key in the db.
pub(crate) fn concatenate_prefix_and_key(prefix: &[u8], key: &[u8]) -> Vec<u8> {
prefix
.iter()
.chain(key.into_iter())
.cloned()
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
@@ -477,8 +477,8 @@ fn offchain_index(wasm_method: WasmExecutionMethod) {
use sp_core::offchain::storage::OffchainOverlayedChange;
assert_eq!(
ext.ext()
.get_offchain_storage_changes()
ext.overlayed_changes()
.offchain_overlay()
.get(sp_core::offchain::STORAGE_PREFIX, b"k"),
Some(OffchainOverlayedChange::SetValue(b"v".to_vec()))
);
@@ -25,7 +25,6 @@ use std::{
use codec::{Encode, Decode};
use sp_core::{
convert_hash, NativeOrEncoded, traits::{CodeExecutor, SpawnNamed},
offchain::storage::OffchainOverlayedChanges,
};
use sp_runtime::{
generic::BlockId, traits::{One, Block as BlockT, Header as HeaderT, HashFor},
@@ -113,7 +112,6 @@ impl<Block, B, Local> CallExecutor<Block> for
method: &str,
call_data: &[u8],
changes: &RefCell<OverlayedChanges>,
offchain_changes: &RefCell<OffchainOverlayedChanges>,
_: Option<&RefCell<StorageTransactionCache<Block, B::State>>>,
initialize_block: InitializeBlock<'a, Block>,
_manager: ExecutionManager<EM>,
@@ -140,7 +138,6 @@ impl<Block, B, Local> CallExecutor<Block> for
method,
call_data,
changes,
offchain_changes,
None,
initialize_block,
ExecutionManager::NativeWhenPossible,
+3 -1
View File
@@ -37,10 +37,12 @@ hyper = "0.13.9"
hyper-rustls = "0.21.0"
[dev-dependencies]
sc-client-db = { version = "0.8.0", default-features = true, path = "../db/" }
sc-client-db = { version = "0.8.0", default-features = true, path = "../db" }
sc-block-builder = { version = "0.8.0", path = "../block-builder" }
sc-transaction-pool = { version = "2.0.0", path = "../transaction-pool" }
sp-transaction-pool = { version = "2.0.0", path = "../../primitives/transaction-pool" }
sp-tracing = { version = "2.0.0", path = "../../primitives/tracing" }
sp-consensus = { version = "0.8.1", path = "../../primitives/consensus/common" }
substrate-test-runtime-client = { version = "2.0.0", path = "../../test-utils/runtime/client" }
tokio = "0.2"
lazy_static = "1.4.0"
+45 -2
View File
@@ -60,7 +60,7 @@ pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
pub trait NetworkProvider: NetworkStateInfo {
/// Set the authorized peers.
fn set_authorized_peers(&self, peers: HashSet<PeerId>);
/// Set the authorized only flag.
fn set_authorized_only(&self, reserved_only: bool);
}
@@ -238,9 +238,15 @@ mod tests {
use super::*;
use std::sync::Arc;
use sc_network::{Multiaddr, PeerId};
use substrate_test_runtime_client::{TestClient, runtime::Block};
use substrate_test_runtime_client::{
TestClient, runtime::Block, TestClientBuilderExt,
DefaultTestClientBuilderExt, ClientBlockImportExt,
};
use sc_transaction_pool::{BasicPool, FullChainApi};
use sp_transaction_pool::{TransactionPool, InPoolTransaction};
use sp_consensus::BlockOrigin;
use sc_client_api::Backend as _;
use sc_block_builder::BlockBuilderProvider as _;
struct TestNetwork();
@@ -307,4 +313,41 @@ mod tests {
assert_eq!(pool.0.status().ready, 1);
assert_eq!(pool.0.ready().next().unwrap().is_propagable(), false);
}
#[test]
fn offchain_index_set_and_clear_works() {
sp_tracing::try_init_simple();
let (client, backend) =
substrate_test_runtime_client::TestClientBuilder::new()
.enable_offchain_indexing_api()
.build_with_backend();
let mut client = Arc::new(client);
let offchain_db = backend.offchain_storage().unwrap();
let key = &b"hello"[..];
let value = &b"world"[..];
let mut block_builder = client.new_block(Default::default()).unwrap();
block_builder.push(
substrate_test_runtime_client::runtime::Extrinsic::OffchainIndexSet(
key.to_vec(),
value.to_vec(),
),
).unwrap();
let block = block_builder.build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
assert_eq!(value, &offchain_db.get(sp_offchain::STORAGE_PREFIX, &key).unwrap());
let mut block_builder = client.new_block(Default::default()).unwrap();
block_builder.push(
substrate_test_runtime_client::runtime::Extrinsic::OffchainIndexClear(key.to_vec()),
).unwrap();
let block = block_builder.build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
assert!(offchain_db.get(sp_offchain::STORAGE_PREFIX, &key).is_none());
}
}
@@ -29,7 +29,6 @@ use sc_executor::{RuntimeVersion, RuntimeInfo, NativeVersion};
use sp_externalities::Extensions;
use sp_core::{
NativeOrEncoded, NeverNativeValue, traits::{CodeExecutor, SpawnNamed, RuntimeCode},
offchain::storage::OffchainOverlayedChanges,
};
use sp_api::{ProofRecorder, InitializeBlock, StorageTransactionCache};
use sc_client_api::{backend, call_executor::CallExecutor};
@@ -127,11 +126,6 @@ where
extensions: Option<Extensions>,
) -> sp_blockchain::Result<Vec<u8>> {
let mut changes = OverlayedChanges::default();
let mut offchain_changes = if self.client_config.offchain_indexing_api {
OffchainOverlayedChanges::enabled()
} else {
OffchainOverlayedChanges::disabled()
};
let changes_trie = backend::changes_tries_state_at_block(
id, self.backend.changes_trie_storage()
)?;
@@ -145,7 +139,6 @@ where
&state,
changes_trie,
&mut changes,
&mut offchain_changes,
&self.executor,
method,
call_data,
@@ -176,7 +169,6 @@ where
method: &str,
call_data: &[u8],
changes: &RefCell<OverlayedChanges>,
offchain_changes: &RefCell<OffchainOverlayedChanges>,
storage_transaction_cache: Option<&RefCell<
StorageTransactionCache<Block, B::State>
>>,
@@ -201,7 +193,6 @@ where
let mut state = self.backend.state_at(*at)?;
let changes = &mut *changes.borrow_mut();
let offchain_changes = &mut *offchain_changes.borrow_mut();
match recorder {
Some(recorder) => {
@@ -213,7 +204,6 @@ where
let state_runtime_code = sp_state_machine::backend::BackendRuntimeCode::new(&trie_state);
// It is important to extract the runtime code here before we create the proof
// recorder.
let runtime_code = state_runtime_code.runtime_code()
.map_err(sp_blockchain::Error::RuntimeCode)?;
let runtime_code = self.check_override(runtime_code, at)?;
@@ -227,7 +217,6 @@ where
&backend,
changes_trie_state,
changes,
offchain_changes,
&self.executor,
method,
call_data,
@@ -249,7 +238,6 @@ where
&state,
changes_trie_state,
changes,
offchain_changes,
&self.executor,
method,
call_data,
@@ -264,7 +252,6 @@ where
fn runtime_version(&self, id: &BlockId<Block>) -> sp_blockchain::Result<RuntimeVersion> {
let mut overlay = OverlayedChanges::default();
let mut offchain_overlay = OffchainOverlayedChanges::default();
let changes_trie_state = backend::changes_tries_state_at_block(
id,
self.backend.changes_trie_storage(),
@@ -273,7 +260,6 @@ where
let mut cache = StorageTransactionCache::<Block, B::State>::default();
let mut ext = Ext::new(
&mut overlay,
&mut offchain_overlay,
&mut cache,
&state,
changes_trie_state,
@@ -1654,7 +1654,6 @@ impl<B, E, Block, RA> CallApiAt<Block> for Client<B, E, Block, RA> where
params.function,
&params.arguments,
params.overlayed_changes,
params.offchain_changes,
Some(params.storage_transaction_cache),
params.initialize_block,
manager,
@@ -36,7 +36,7 @@ use parking_lot::Mutex;
use substrate_test_runtime_client::{
runtime::{Hash, Block, Header}, TestClient, ClientBlockImportExt,
};
use sp_api::{InitializeBlock, StorageTransactionCache, ProofRecorder, OffchainOverlayedChanges};
use sp_api::{InitializeBlock, StorageTransactionCache, ProofRecorder};
use sp_consensus::BlockOrigin;
use sc_executor::{NativeExecutor, WasmExecutionMethod, RuntimeVersion, NativeVersion};
use sp_core::{H256, NativeOrEncoded, testing::TaskExecutor};
@@ -223,7 +223,6 @@ impl CallExecutor<Block> for DummyCallExecutor {
_method: &str,
_call_data: &[u8],
_changes: &RefCell<OverlayedChanges>,
_offchain_changes: &RefCell<OffchainOverlayedChanges>,
_storage_transaction_cache: Option<&RefCell<
StorageTransactionCache<
Block,
@@ -41,7 +41,7 @@ use sp_runtime::traits::{
};
use substrate_test_runtime::TestAPI;
use sp_state_machine::backend::Backend as _;
use sp_api::{ProvideRuntimeApi, OffchainOverlayedChanges};
use sp_api::ProvideRuntimeApi;
use sp_core::{H256, ChangesTrieConfiguration, blake2_256, testing::TaskExecutor};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -163,7 +163,6 @@ fn construct_block(
};
let hash = header.hash();
let mut overlay = OverlayedChanges::default();
let mut offchain_overlay = OffchainOverlayedChanges::default();
let backend_runtime_code = sp_state_machine::backend::BackendRuntimeCode::new(&backend);
let runtime_code = backend_runtime_code.runtime_code().expect("Code is part of the backend");
let task_executor = Box::new(TaskExecutor::new());
@@ -172,7 +171,6 @@ fn construct_block(
backend,
sp_state_machine::disabled_changes_trie_state::<_, u64>(),
&mut overlay,
&mut offchain_overlay,
&executor(),
"Core_initialize_block",
&header.encode(),
@@ -188,7 +186,6 @@ fn construct_block(
backend,
sp_state_machine::disabled_changes_trie_state::<_, u64>(),
&mut overlay,
&mut offchain_overlay,
&executor(),
"BlockBuilder_apply_extrinsic",
&tx.encode(),
@@ -204,7 +201,6 @@ fn construct_block(
backend,
sp_state_machine::disabled_changes_trie_state::<_, u64>(),
&mut overlay,
&mut offchain_overlay,
&executor(),
"BlockBuilder_finalize_block",
&[],
@@ -252,13 +248,11 @@ fn construct_genesis_should_work_with_native() {
let runtime_code = backend_runtime_code.runtime_code().expect("Code is part of the backend");
let mut overlay = OverlayedChanges::default();
let mut offchain_overlay = OffchainOverlayedChanges::default();
let _ = StateMachine::new(
&backend,
sp_state_machine::disabled_changes_trie_state::<_, u64>(),
&mut overlay,
&mut offchain_overlay,
&executor(),
"Core_execute_block",
&b1data,
@@ -288,13 +282,11 @@ fn construct_genesis_should_work_with_wasm() {
let runtime_code = backend_runtime_code.runtime_code().expect("Code is part of the backend");
let mut overlay = OverlayedChanges::default();
let mut offchain_overlay = OffchainOverlayedChanges::default();
let _ = StateMachine::new(
&backend,
sp_state_machine::disabled_changes_trie_state::<_, u64>(),
&mut overlay,
&mut offchain_overlay,
&executor(),
"Core_execute_block",
&b1data,
@@ -324,13 +316,11 @@ fn construct_genesis_with_bad_transaction_should_panic() {
let runtime_code = backend_runtime_code.runtime_code().expect("Code is part of the backend");
let mut overlay = OverlayedChanges::default();
let mut offchain_overlay = OffchainOverlayedChanges::default();
let r = StateMachine::new(
&backend,
sp_state_machine::disabled_changes_trie_state::<_, u64>(),
&mut overlay,
&mut offchain_overlay,
&executor(),
"Core_execute_block",
&b1data,