mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-09 20:11:09 +00:00
Custom RPC for Merkle Mountain Range pallet (#8137)
* Add MMR custom RPC.
* Change RuntimeApi to avoid hardcoding leaf type.
* Properly implement the new RuntimeAPI and wire up RPC.
* Extract Offchain DB as separate execution extension.
* Enable offchain DB access for offchain calls.
* Fix offchain_election tests.
* Skip block initialisation for proof generation.
* Fix integration test setup.
* Fix offchain tests. Not sure how I missed them earlier 🤷.
* Fix long line.
* One more test missing.
* Update mock for multi-phase.
* Address review grumbbles.
* Address review grumbles.
* Fix line width of a comment
This commit is contained in:
@@ -26,7 +26,7 @@ use std::sync::{Weak, Arc};
|
||||
use codec::Decode;
|
||||
use sp_core::{
|
||||
ExecutionContext,
|
||||
offchain::{self, OffchainExt, TransactionPoolExt},
|
||||
offchain::{self, OffchainWorkerExt, TransactionPoolExt, OffchainDbExt},
|
||||
};
|
||||
use sp_keystore::{KeystoreExt, SyncCryptoStorePtr};
|
||||
use sp_runtime::{
|
||||
@@ -76,6 +76,18 @@ impl ExtensionsFactory for () {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a Offchain DB accessor object.
|
||||
pub trait DbExternalitiesFactory: Send + Sync {
|
||||
/// Create [`offchain::DbExternalities`] instance.
|
||||
fn create(&self) -> Box<dyn offchain::DbExternalities>;
|
||||
}
|
||||
|
||||
impl<T: offchain::DbExternalities + Clone + Sync + Send + 'static> DbExternalitiesFactory for T {
|
||||
fn create(&self) -> Box<dyn offchain::DbExternalities> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// A producer of execution extensions for offchain calls.
|
||||
///
|
||||
/// This crate aggregates extensions available for the offchain calls
|
||||
@@ -84,6 +96,7 @@ impl ExtensionsFactory for () {
|
||||
pub struct ExecutionExtensions<Block: traits::Block> {
|
||||
strategies: ExecutionStrategies,
|
||||
keystore: Option<SyncCryptoStorePtr>,
|
||||
offchain_db: Option<Box<dyn DbExternalitiesFactory>>,
|
||||
// FIXME: these two are only RwLock because of https://github.com/paritytech/substrate/issues/4587
|
||||
// remove when fixed.
|
||||
// To break retain cycle between `Client` and `TransactionPool` we require this
|
||||
@@ -99,6 +112,7 @@ impl<Block: traits::Block> Default for ExecutionExtensions<Block> {
|
||||
Self {
|
||||
strategies: Default::default(),
|
||||
keystore: None,
|
||||
offchain_db: None,
|
||||
transaction_pool: RwLock::new(None),
|
||||
extensions_factory: RwLock::new(Box::new(())),
|
||||
}
|
||||
@@ -110,12 +124,14 @@ impl<Block: traits::Block> ExecutionExtensions<Block> {
|
||||
pub fn new(
|
||||
strategies: ExecutionStrategies,
|
||||
keystore: Option<SyncCryptoStorePtr>,
|
||||
offchain_db: Option<Box<dyn DbExternalitiesFactory>>,
|
||||
) -> Self {
|
||||
let transaction_pool = RwLock::new(None);
|
||||
let extensions_factory = Box::new(());
|
||||
Self {
|
||||
strategies,
|
||||
keystore,
|
||||
offchain_db,
|
||||
extensions_factory: RwLock::new(extensions_factory),
|
||||
transaction_pool,
|
||||
}
|
||||
@@ -164,9 +180,22 @@ impl<Block: traits::Block> ExecutionExtensions<Block> {
|
||||
}
|
||||
}
|
||||
|
||||
if capabilities.has(offchain::Capability::OffchainDbRead) ||
|
||||
capabilities.has(offchain::Capability::OffchainDbWrite)
|
||||
{
|
||||
if let Some(offchain_db) = self.offchain_db.as_ref() {
|
||||
extensions.register(
|
||||
OffchainDbExt::new(offchain::LimitedExternalities::new(
|
||||
capabilities,
|
||||
offchain_db.create(),
|
||||
))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let ExecutionContext::OffchainCall(Some(ext)) = context {
|
||||
extensions.register(
|
||||
OffchainExt::new(offchain::LimitedExternalities::new(capabilities, ext.0)),
|
||||
OffchainWorkerExt::new(offchain::LimitedExternalities::new(capabilities, ext.0)),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ use codec::{Encode, Decode};
|
||||
use hex_literal::hex;
|
||||
use sp_core::{
|
||||
blake2_128, blake2_256, ed25519, sr25519, map, Pair,
|
||||
offchain::{OffchainExt, testing},
|
||||
offchain::{OffchainWorkerExt, OffchainDbExt, testing},
|
||||
traits::{Externalities, CallInWasm},
|
||||
};
|
||||
use sc_runtime_test::wasm_binary_unwrap;
|
||||
@@ -468,7 +468,7 @@ test_wasm_execution!(offchain_index);
|
||||
fn offchain_index(wasm_method: WasmExecutionMethod) {
|
||||
let mut ext = TestExternalities::default();
|
||||
let (offchain, _state) = testing::TestOffchainExt::new();
|
||||
ext.register_extension(OffchainExt::new(offchain));
|
||||
ext.register_extension(OffchainWorkerExt::new(offchain));
|
||||
call_in_wasm(
|
||||
"test_offchain_index_set",
|
||||
&[0],
|
||||
@@ -487,7 +487,8 @@ test_wasm_execution!(offchain_local_storage_should_work);
|
||||
fn offchain_local_storage_should_work(wasm_method: WasmExecutionMethod) {
|
||||
let mut ext = TestExternalities::default();
|
||||
let (offchain, state) = testing::TestOffchainExt::new();
|
||||
ext.register_extension(OffchainExt::new(offchain));
|
||||
ext.register_extension(OffchainDbExt::new(offchain.clone()));
|
||||
ext.register_extension(OffchainWorkerExt::new(offchain));
|
||||
assert_eq!(
|
||||
call_in_wasm(
|
||||
"test_offchain_local_storage",
|
||||
@@ -504,7 +505,7 @@ test_wasm_execution!(offchain_http_should_work);
|
||||
fn offchain_http_should_work(wasm_method: WasmExecutionMethod) {
|
||||
let mut ext = TestExternalities::default();
|
||||
let (offchain, state) = testing::TestOffchainExt::new();
|
||||
ext.register_extension(OffchainExt::new(offchain));
|
||||
ext.register_extension(OffchainWorkerExt::new(offchain));
|
||||
state.write().expect_request(testing::PendingRequest {
|
||||
method: "POST".into(),
|
||||
uri: "http://localhost:12345".into(),
|
||||
|
||||
@@ -14,23 +14,24 @@ targets = ["x86_64-unknown-linux-gnu"]
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.5"
|
||||
sc-client-api = { version = "3.0.0", path = "../api" }
|
||||
sp-api = { version = "3.0.0", path = "../../primitives/api" }
|
||||
codec = { package = "parity-scale-codec", version = "2.0.0", features = ["derive"] }
|
||||
hex = "0.4"
|
||||
fnv = "1.0.6"
|
||||
futures = "0.3.9"
|
||||
futures-timer = "3.0.1"
|
||||
log = "0.4.8"
|
||||
threadpool = "1.7"
|
||||
num_cpus = "1.10"
|
||||
sp-offchain = { version = "3.0.0", path = "../../primitives/offchain" }
|
||||
codec = { package = "parity-scale-codec", version = "2.0.0", features = ["derive"] }
|
||||
parking_lot = "0.11.1"
|
||||
sp-core = { version = "3.0.0", path = "../../primitives/core" }
|
||||
rand = "0.7.2"
|
||||
sc-client-api = { version = "3.0.0", path = "../api" }
|
||||
sc-keystore = { version = "3.0.0", path = "../keystore" }
|
||||
sc-network = { version = "0.9.0", path = "../network" }
|
||||
sp-api = { version = "3.0.0", path = "../../primitives/api" }
|
||||
sp-core = { version = "3.0.0", path = "../../primitives/core" }
|
||||
sp-offchain = { version = "3.0.0", path = "../../primitives/offchain" }
|
||||
sp-runtime = { version = "3.0.0", path = "../../primitives/runtime" }
|
||||
sp-utils = { version = "3.0.0", path = "../../primitives/utils" }
|
||||
sc-network = { version = "0.9.0", path = "../network" }
|
||||
sc-keystore = { version = "3.0.0", path = "../keystore" }
|
||||
threadpool = "1.7"
|
||||
|
||||
[target.'cfg(not(target_os = "unknown"))'.dependencies]
|
||||
hyper = "0.13.9"
|
||||
|
||||
@@ -26,12 +26,11 @@ use std::{
|
||||
|
||||
use crate::NetworkProvider;
|
||||
use futures::Future;
|
||||
use log::error;
|
||||
use sc_network::{PeerId, Multiaddr};
|
||||
use codec::{Encode, Decode};
|
||||
use sp_core::OpaquePeerId;
|
||||
use sp_core::offchain::{
|
||||
Externalities as OffchainExt, HttpRequestId, Timestamp, HttpRequestStatus, HttpError,
|
||||
self, HttpRequestId, Timestamp, HttpRequestStatus, HttpError,
|
||||
OffchainStorage, OpaqueNetworkState, OpaqueMultiaddr, StorageKind,
|
||||
};
|
||||
pub use sp_offchain::STORAGE_PREFIX;
|
||||
@@ -47,22 +46,9 @@ mod http_dummy;
|
||||
|
||||
mod timestamp;
|
||||
|
||||
/// Asynchronous offchain API.
|
||||
///
|
||||
/// NOTE this is done to prevent recursive calls into the runtime (which are not supported currently).
|
||||
pub(crate) struct Api<Storage> {
|
||||
/// Offchain Workers database.
|
||||
db: Storage,
|
||||
/// A provider for substrate networking.
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
/// Is this node a potential validator?
|
||||
is_validator: bool,
|
||||
/// Everything HTTP-related is handled by a different struct.
|
||||
http: http::HttpApi,
|
||||
}
|
||||
|
||||
fn unavailable_yet<R: Default>(name: &str) -> R {
|
||||
error!(
|
||||
log::error!(
|
||||
target: "sc_offchain",
|
||||
"The {:?} API is not available for offchain workers yet. Follow \
|
||||
https://github.com/paritytech/substrate/issues/1458 for details", name
|
||||
);
|
||||
@@ -71,7 +57,109 @@ fn unavailable_yet<R: Default>(name: &str) -> R {
|
||||
|
||||
const LOCAL_DB: &str = "LOCAL (fork-aware) DB";
|
||||
|
||||
impl<Storage: OffchainStorage> OffchainExt for Api<Storage> {
|
||||
/// Offchain DB reference.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Db<Storage> {
|
||||
/// Persistent storage database.
|
||||
persistent: Storage,
|
||||
}
|
||||
|
||||
impl<Storage: OffchainStorage> Db<Storage> {
|
||||
/// Create new instance of Offchain DB.
|
||||
pub fn new(persistent: Storage) -> Self {
|
||||
Self { persistent }
|
||||
}
|
||||
|
||||
/// Create new instance of Offchain DB, backed by given backend.
|
||||
pub fn factory_from_backend<Backend, Block>(backend: &Backend) -> Option<
|
||||
Box<dyn sc_client_api::execution_extensions::DbExternalitiesFactory>
|
||||
> where
|
||||
Backend: sc_client_api::Backend<Block, OffchainStorage = Storage>,
|
||||
Block: sp_runtime::traits::Block,
|
||||
Storage: 'static,
|
||||
{
|
||||
sc_client_api::Backend::offchain_storage(backend).map(|db|
|
||||
Box::new(Self::new(db)) as _
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl<Storage: OffchainStorage> offchain::DbExternalities for Db<Storage> {
|
||||
fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) {
|
||||
log::debug!(
|
||||
target: "sc_offchain",
|
||||
"{:?}: Write: {:?} <= {:?}", kind, hex::encode(key), hex::encode(value)
|
||||
);
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => self.persistent.set(STORAGE_PREFIX, key, value),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_clear(&mut self, kind: StorageKind, key: &[u8]) {
|
||||
log::debug!(
|
||||
target: "sc_offchain",
|
||||
"{:?}: Clear: {:?}", kind, hex::encode(key)
|
||||
);
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => self.persistent.remove(STORAGE_PREFIX, key),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_compare_and_set(
|
||||
&mut self,
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8],
|
||||
) -> bool {
|
||||
log::debug!(
|
||||
target: "sc_offchain",
|
||||
"{:?}: CAS: {:?} <= {:?} vs {:?}",
|
||||
kind,
|
||||
hex::encode(key),
|
||||
hex::encode(new_value),
|
||||
old_value.as_ref().map(hex::encode),
|
||||
);
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => {
|
||||
self.persistent.compare_and_set(STORAGE_PREFIX, key, old_value, new_value)
|
||||
},
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option<Vec<u8>> {
|
||||
let result = match kind {
|
||||
StorageKind::PERSISTENT => self.persistent.get(STORAGE_PREFIX, key),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
};
|
||||
log::debug!(
|
||||
target: "sc_offchain",
|
||||
"{:?}: Read: {:?} => {:?}",
|
||||
kind,
|
||||
hex::encode(key),
|
||||
result.as_ref().map(hex::encode)
|
||||
);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/// Asynchronous offchain API.
|
||||
///
|
||||
/// NOTE this is done to prevent recursive calls into the runtime
|
||||
/// (which are not supported currently).
|
||||
pub(crate) struct Api {
|
||||
/// A provider for substrate networking.
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
/// Is this node a potential validator?
|
||||
is_validator: bool,
|
||||
/// Everything HTTP-related is handled by a different struct.
|
||||
http: http::HttpApi,
|
||||
}
|
||||
|
||||
impl offchain::Externalities for Api {
|
||||
fn is_validator(&self) -> bool {
|
||||
self.is_validator
|
||||
}
|
||||
@@ -98,42 +186,6 @@ impl<Storage: OffchainStorage> OffchainExt for Api<Storage> {
|
||||
rand::random()
|
||||
}
|
||||
|
||||
fn local_storage_set(&mut self, kind: StorageKind, key: &[u8], value: &[u8]) {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => self.db.set(STORAGE_PREFIX, key, value),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_clear(&mut self, kind: StorageKind, key: &[u8]) {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => self.db.remove(STORAGE_PREFIX, key),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_compare_and_set(
|
||||
&mut self,
|
||||
kind: StorageKind,
|
||||
key: &[u8],
|
||||
old_value: Option<&[u8]>,
|
||||
new_value: &[u8],
|
||||
) -> bool {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => {
|
||||
self.db.compare_and_set(STORAGE_PREFIX, key, old_value, new_value)
|
||||
},
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn local_storage_get(&mut self, kind: StorageKind, key: &[u8]) -> Option<Vec<u8>> {
|
||||
match kind {
|
||||
StorageKind::PERSISTENT => self.db.get(STORAGE_PREFIX, key),
|
||||
StorageKind::LOCAL => unavailable_yet(LOCAL_DB),
|
||||
}
|
||||
}
|
||||
|
||||
fn http_request_start(
|
||||
&mut self,
|
||||
method: &str,
|
||||
@@ -270,16 +322,14 @@ pub(crate) struct AsyncApi {
|
||||
|
||||
impl AsyncApi {
|
||||
/// Creates new Offchain extensions API implementation an the asynchronous processing part.
|
||||
pub fn new<S: OffchainStorage>(
|
||||
db: S,
|
||||
pub fn new(
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
is_validator: bool,
|
||||
shared_client: SharedClient,
|
||||
) -> (Api<S>, Self) {
|
||||
) -> (Api, Self) {
|
||||
let (http_api, http_worker) = http::http(shared_client);
|
||||
|
||||
let api = Api {
|
||||
db,
|
||||
network_provider,
|
||||
is_validator,
|
||||
http: http_api,
|
||||
@@ -303,9 +353,10 @@ impl AsyncApi {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::{convert::{TryFrom, TryInto}, time::SystemTime};
|
||||
use sc_client_db::offchain::LocalStorage;
|
||||
use sc_network::{NetworkStateInfo, PeerId};
|
||||
use sp_core::offchain::{Externalities, DbExternalities};
|
||||
use std::{convert::{TryFrom, TryInto}, time::SystemTime};
|
||||
|
||||
struct TestNetwork();
|
||||
|
||||
@@ -329,20 +380,22 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn offchain_api() -> (Api<LocalStorage>, AsyncApi) {
|
||||
fn offchain_api() -> (Api, AsyncApi) {
|
||||
sp_tracing::try_init_simple();
|
||||
let db = LocalStorage::new_test();
|
||||
let mock = Arc::new(TestNetwork());
|
||||
let shared_client = SharedClient::new();
|
||||
|
||||
AsyncApi::new(
|
||||
db,
|
||||
mock,
|
||||
false,
|
||||
shared_client,
|
||||
)
|
||||
}
|
||||
|
||||
fn offchain_db() -> Db<LocalStorage> {
|
||||
Db::new(LocalStorage::new_test())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_get_timestamp() {
|
||||
let mut api = offchain_api().0;
|
||||
@@ -381,7 +434,7 @@ mod tests {
|
||||
fn should_set_and_get_local_storage() {
|
||||
// given
|
||||
let kind = StorageKind::PERSISTENT;
|
||||
let mut api = offchain_api().0;
|
||||
let mut api = offchain_db();
|
||||
let key = b"test";
|
||||
|
||||
// when
|
||||
@@ -396,7 +449,7 @@ mod tests {
|
||||
fn should_compare_and_set_local_storage() {
|
||||
// given
|
||||
let kind = StorageKind::PERSISTENT;
|
||||
let mut api = offchain_api().0;
|
||||
let mut api = offchain_db();
|
||||
let key = b"test";
|
||||
api.local_storage_set(kind, key, b"value");
|
||||
|
||||
@@ -413,7 +466,7 @@ mod tests {
|
||||
fn should_compare_and_set_local_storage_with_none() {
|
||||
// given
|
||||
let kind = StorageKind::PERSISTENT;
|
||||
let mut api = offchain_api().0;
|
||||
let mut api = offchain_db();
|
||||
let key = b"test";
|
||||
|
||||
// when
|
||||
|
||||
@@ -46,13 +46,13 @@ use sp_api::{ApiExt, ProvideRuntimeApi};
|
||||
use futures::future::Future;
|
||||
use log::{debug, warn};
|
||||
use sc_network::{ExHashT, NetworkService, NetworkStateInfo, PeerId};
|
||||
use sp_core::{offchain::{self, OffchainStorage}, ExecutionContext, traits::SpawnNamed};
|
||||
use sp_core::{offchain, ExecutionContext, traits::SpawnNamed};
|
||||
use sp_runtime::{generic::BlockId, traits::{self, Header}};
|
||||
use futures::{prelude::*, future::ready};
|
||||
|
||||
mod api;
|
||||
use api::SharedClient;
|
||||
|
||||
pub use api::Db as OffchainDb;
|
||||
pub use sp_offchain::{OffchainWorkerApi, STORAGE_PREFIX};
|
||||
|
||||
/// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the
|
||||
@@ -80,21 +80,19 @@ where
|
||||
}
|
||||
|
||||
/// An offchain workers manager.
|
||||
pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
|
||||
pub struct OffchainWorkers<Client, Block: traits::Block> {
|
||||
client: Arc<Client>,
|
||||
db: Storage,
|
||||
_block: PhantomData<Block>,
|
||||
thread_pool: Mutex<ThreadPool>,
|
||||
shared_client: SharedClient,
|
||||
shared_client: api::SharedClient,
|
||||
}
|
||||
|
||||
impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
|
||||
impl<Client, Block: traits::Block> OffchainWorkers<Client, Block> {
|
||||
/// Creates new `OffchainWorkers`.
|
||||
pub fn new(client: Arc<Client>, db: Storage) -> Self {
|
||||
let shared_client = SharedClient::new();
|
||||
pub fn new(client: Arc<Client>) -> Self {
|
||||
let shared_client = api::SharedClient::new();
|
||||
Self {
|
||||
client,
|
||||
db,
|
||||
_block: PhantomData,
|
||||
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
|
||||
shared_client,
|
||||
@@ -102,9 +100,8 @@ impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Blo
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client, Storage, Block: traits::Block> fmt::Debug for OffchainWorkers<
|
||||
impl<Client, Block: traits::Block> fmt::Debug for OffchainWorkers<
|
||||
Client,
|
||||
Storage,
|
||||
Block,
|
||||
> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
@@ -112,15 +109,13 @@ impl<Client, Storage, Block: traits::Block> fmt::Debug for OffchainWorkers<
|
||||
}
|
||||
}
|
||||
|
||||
impl<Client, Storage, Block> OffchainWorkers<
|
||||
impl<Client, Block> OffchainWorkers<
|
||||
Client,
|
||||
Storage,
|
||||
Block,
|
||||
> where
|
||||
Block: traits::Block,
|
||||
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
|
||||
Client::Api: OffchainWorkerApi<Block>,
|
||||
Storage: OffchainStorage + 'static,
|
||||
{
|
||||
/// Start the offchain workers after given block.
|
||||
#[must_use]
|
||||
@@ -150,7 +145,6 @@ impl<Client, Storage, Block> OffchainWorkers<
|
||||
debug!("Checking offchain workers at {:?}: version:{}", at, version);
|
||||
if version > 0 {
|
||||
let (api, runner) = api::AsyncApi::new(
|
||||
self.db.clone(),
|
||||
network_provider,
|
||||
is_validator,
|
||||
self.shared_client.clone(),
|
||||
@@ -197,10 +191,10 @@ impl<Client, Storage, Block> OffchainWorkers<
|
||||
}
|
||||
|
||||
/// Inform the offchain worker about new imported blocks
|
||||
pub async fn notification_future<Client, Storage, Block, Spawner>(
|
||||
pub async fn notification_future<Client, Block, Spawner>(
|
||||
is_validator: bool,
|
||||
client: Arc<Client>,
|
||||
offchain: Arc<OffchainWorkers<Client, Storage, Block>>,
|
||||
offchain: Arc<OffchainWorkers<Client, Block>>,
|
||||
spawner: Spawner,
|
||||
network_provider: Arc<dyn NetworkProvider + Send + Sync>,
|
||||
)
|
||||
@@ -208,7 +202,6 @@ pub async fn notification_future<Client, Storage, Block, Spawner>(
|
||||
Block: traits::Block,
|
||||
Client: ProvideRuntimeApi<Block> + sc_client_api::BlockchainEvents<Block> + Send + Sync + 'static,
|
||||
Client::Api: OffchainWorkerApi<Block>,
|
||||
Storage: OffchainStorage + 'static,
|
||||
Spawner: SpawnNamed
|
||||
{
|
||||
client.import_notification_stream().for_each(move |n| {
|
||||
@@ -300,12 +293,11 @@ mod tests {
|
||||
spawner,
|
||||
client.clone(),
|
||||
));
|
||||
let db = sc_client_db::offchain::LocalStorage::new_test();
|
||||
let network = Arc::new(TestNetwork());
|
||||
let header = client.header(&BlockId::number(0)).unwrap().unwrap();
|
||||
|
||||
// when
|
||||
let offchain = OffchainWorkers::new(client, db);
|
||||
let offchain = OffchainWorkers::new(client);
|
||||
futures::executor::block_on(
|
||||
offchain.on_block_imported(&header, network, false)
|
||||
);
|
||||
@@ -317,6 +309,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn offchain_index_set_and_clear_works() {
|
||||
use sp_core::offchain::OffchainStorage;
|
||||
|
||||
sp_tracing::try_init_simple();
|
||||
|
||||
let (client, backend) =
|
||||
|
||||
@@ -39,7 +39,7 @@ use futures::{
|
||||
channel::oneshot,
|
||||
};
|
||||
use sc_keystore::LocalKeystore;
|
||||
use log::{info, warn};
|
||||
use log::info;
|
||||
use sc_network::config::{Role, OnDemand};
|
||||
use sc_network::NetworkService;
|
||||
use sc_network::block_request_handler::{self, BlockRequestHandler};
|
||||
@@ -338,13 +338,17 @@ pub fn new_full_parts<TBl, TRtApi, TExecDisp>(
|
||||
transaction_storage: config.transaction_storage.clone(),
|
||||
};
|
||||
|
||||
|
||||
let backend = new_db_backend(db_config)?;
|
||||
|
||||
let extensions = sc_client_api::execution_extensions::ExecutionExtensions::new(
|
||||
config.execution_strategies.clone(),
|
||||
Some(keystore_container.sync_keystore()),
|
||||
sc_offchain::OffchainDb::factory_from_backend(&*backend),
|
||||
);
|
||||
|
||||
new_client(
|
||||
db_config,
|
||||
let client = new_client(
|
||||
backend.clone(),
|
||||
executor,
|
||||
chain_spec.as_storage_builder(),
|
||||
fork_blocks,
|
||||
@@ -357,7 +361,9 @@ pub fn new_full_parts<TBl, TRtApi, TExecDisp>(
|
||||
offchain_indexing_api: config.offchain_worker.indexing_enabled,
|
||||
wasm_runtime_overrides: config.wasm_runtime_overrides.clone(),
|
||||
},
|
||||
)?
|
||||
)?;
|
||||
|
||||
(client, backend)
|
||||
};
|
||||
|
||||
Ok((
|
||||
@@ -420,9 +426,20 @@ pub fn new_light_parts<TBl, TRtApi, TExecDisp>(
|
||||
Ok((client, backend, keystore_container, task_manager, on_demand))
|
||||
}
|
||||
|
||||
/// Create an instance of db-backed client.
|
||||
pub fn new_client<E, Block, RA>(
|
||||
/// Create an instance of default DB-backend backend.
|
||||
pub fn new_db_backend<Block>(
|
||||
settings: DatabaseSettings,
|
||||
) -> Result<Arc<Backend<Block>>, sp_blockchain::Error> where
|
||||
Block: BlockT,
|
||||
{
|
||||
const CANONICALIZATION_DELAY: u64 = 4096;
|
||||
|
||||
Ok(Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?))
|
||||
}
|
||||
|
||||
/// Create an instance of client backed by given backend.
|
||||
pub fn new_client<E, Block, RA>(
|
||||
backend: Arc<Backend<Block>>,
|
||||
executor: E,
|
||||
genesis_storage: &dyn BuildStorage,
|
||||
fork_blocks: ForkBlocks<Block>,
|
||||
@@ -431,38 +448,30 @@ pub fn new_client<E, Block, RA>(
|
||||
spawn_handle: Box<dyn SpawnNamed>,
|
||||
prometheus_registry: Option<Registry>,
|
||||
config: ClientConfig,
|
||||
) -> Result<(
|
||||
) -> Result<
|
||||
crate::client::Client<
|
||||
Backend<Block>,
|
||||
crate::client::LocalCallExecutor<Backend<Block>, E>,
|
||||
Block,
|
||||
RA,
|
||||
>,
|
||||
Arc<Backend<Block>>,
|
||||
),
|
||||
sp_blockchain::Error,
|
||||
>
|
||||
where
|
||||
Block: BlockT,
|
||||
E: CodeExecutor + RuntimeInfo,
|
||||
{
|
||||
const CANONICALIZATION_DELAY: u64 = 4096;
|
||||
|
||||
let backend = Arc::new(Backend::new(settings, CANONICALIZATION_DELAY)?);
|
||||
let executor = crate::client::LocalCallExecutor::new(backend.clone(), executor, spawn_handle, config.clone())?;
|
||||
Ok((
|
||||
crate::client::Client::new(
|
||||
backend.clone(),
|
||||
executor,
|
||||
genesis_storage,
|
||||
fork_blocks,
|
||||
bad_blocks,
|
||||
execution_extensions,
|
||||
prometheus_registry,
|
||||
config,
|
||||
)?,
|
||||
Ok(crate::client::Client::new(
|
||||
backend,
|
||||
))
|
||||
executor,
|
||||
genesis_storage,
|
||||
fork_blocks,
|
||||
bad_blocks,
|
||||
execution_extensions,
|
||||
prometheus_registry,
|
||||
config,
|
||||
)?)
|
||||
}
|
||||
|
||||
/// Parameters to pass into `build`.
|
||||
@@ -499,28 +508,18 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> {
|
||||
}
|
||||
|
||||
/// Build a shared offchain workers instance.
|
||||
pub fn build_offchain_workers<TBl, TBackend, TCl>(
|
||||
pub fn build_offchain_workers<TBl, TCl>(
|
||||
config: &Configuration,
|
||||
backend: Arc<TBackend>,
|
||||
spawn_handle: SpawnTaskHandle,
|
||||
client: Arc<TCl>,
|
||||
network: Arc<NetworkService<TBl, <TBl as BlockT>::Hash>>,
|
||||
) -> Option<Arc<sc_offchain::OffchainWorkers<TCl, TBackend::OffchainStorage, TBl>>>
|
||||
) -> Option<Arc<sc_offchain::OffchainWorkers<TCl, TBl>>>
|
||||
where
|
||||
TBl: BlockT, TBackend: sc_client_api::Backend<TBl>,
|
||||
<TBackend as sc_client_api::Backend<TBl>>::OffchainStorage: 'static,
|
||||
TBl: BlockT,
|
||||
TCl: Send + Sync + ProvideRuntimeApi<TBl> + BlockchainEvents<TBl> + 'static,
|
||||
<TCl as ProvideRuntimeApi<TBl>>::Api: sc_offchain::OffchainWorkerApi<TBl>,
|
||||
{
|
||||
let offchain_workers = match backend.offchain_storage() {
|
||||
Some(db) => {
|
||||
Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db)))
|
||||
},
|
||||
None => {
|
||||
warn!("Offchain workers disabled, due to lack of offchain storage support in backend.");
|
||||
None
|
||||
},
|
||||
};
|
||||
let offchain_workers = Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone())));
|
||||
|
||||
// Inform the offchain worker about new imported blocks
|
||||
if let Some(offchain) = offchain_workers.clone() {
|
||||
|
||||
@@ -205,7 +205,11 @@ pub fn new_with_backend<B, E, Block, S, RA>(
|
||||
B: backend::LocalBackend<Block> + 'static,
|
||||
{
|
||||
let call_executor = LocalCallExecutor::new(backend.clone(), executor, spawn_handle, config.clone())?;
|
||||
let extensions = ExecutionExtensions::new(Default::default(), keystore);
|
||||
let extensions = ExecutionExtensions::new(
|
||||
Default::default(),
|
||||
keystore,
|
||||
sc_offchain::OffchainDb::factory_from_backend(&*backend),
|
||||
);
|
||||
Client::new(
|
||||
backend,
|
||||
call_executor,
|
||||
|
||||
@@ -51,7 +51,7 @@ use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver}
|
||||
|
||||
pub use self::error::Error;
|
||||
pub use self::builder::{
|
||||
new_full_client, new_client, new_full_parts, new_light_parts,
|
||||
new_full_client, new_db_backend, new_client, new_full_parts, new_light_parts,
|
||||
spawn_tasks, build_network, build_offchain_workers,
|
||||
BuildNetworkParams, KeystoreContainer, NetworkStarter, SpawnTasksParams, TFullClient, TLightClient,
|
||||
TFullBackend, TLightBackend, TLightBackendWithHash, TLightClientWithBackend,
|
||||
|
||||
Reference in New Issue
Block a user