Name all the tasks! (#6726)

* Remove any implementation of `Spawn` or `Executor` from our task executors

* Fix compilation

* Rename `SpawnBlockingExecutor`

* Update primitives/core/src/traits.rs

Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>

* Fix tests

Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com>
This commit is contained in:
Bastian Köcher
2020-07-26 14:56:17 +02:00
committed by GitHub
parent 2ec131142b
commit 9310f15ac2
43 changed files with 280 additions and 280 deletions
@@ -197,11 +197,11 @@ fn record_proof_works() {
None,
8,
);
execution_proof_check_on_trie_backend::<_, u64, _>(
execution_proof_check_on_trie_backend::<_, u64, _, _>(
&backend,
&mut overlay,
&executor,
sp_core::tasks::executor(),
sp_core::testing::TaskExecutor::new(),
"Core_execute_block",
&block.encode(),
&runtime_code,
+2
View File
@@ -39,6 +39,7 @@ sp-externalities = { version = "0.8.0-rc5", optional = true, path = "../external
sp-storage = { version = "2.0.0-rc5", default-features = false, path = "../storage" }
parity-util-mem = { version = "0.7.0", default-features = false, features = ["primitive-types"] }
futures = { version = "0.3.1", optional = true }
dyn-clonable = { version = "0.9.0", optional = true }
# full crypto
ed25519-dalek = { version = "1.0.0-pre.4", default-features = false, features = ["u64_backend", "alloc"], optional = true }
@@ -111,6 +112,7 @@ std = [
"futures",
"futures/thread-pool",
"libsecp256k1/std",
"dyn-clonable",
]
# This feature enables all crypto primitives for `no_std` builds like microcontrollers
-2
View File
@@ -72,8 +72,6 @@ mod changes_trie;
pub mod traits;
pub mod testing;
#[cfg(feature = "std")]
pub mod tasks;
#[cfg(feature = "std")]
pub mod vrf;
pub use self::hash::{H160, H256, H512, convert_hash};
-57
View File
@@ -1,57 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Module for low-level asynchronous processing.
use crate::traits::CloneableSpawn;
use futures::{executor, task};
/// Simple task executor.
///
/// Uses single thread for scheduling tasks. Can be cloned and used in
/// runtime host (implements `CloneableSpawn`).
#[derive(Debug, Clone)]
pub struct Executor {
pool: executor::ThreadPool,
}
impl Executor {
fn new() -> Self {
Self {
pool: executor::ThreadPool::builder().pool_size(1).create()
.expect("Failed to create task executor")
}
}
}
impl task::Spawn for Executor {
fn spawn_obj(&self, future: task::FutureObj<'static, ()>)
-> Result<(), task::SpawnError> {
self.pool.spawn_obj(future)
}
}
impl CloneableSpawn for Executor {
fn clone(&self) -> Box<dyn CloneableSpawn> {
Box::new(Clone::clone(self))
}
}
/// Create tasks executor.
pub fn executor() -> Box<dyn CloneableSpawn> {
Box::new(Executor::new())
}
+4 -4
View File
@@ -359,16 +359,16 @@ macro_rules! wasm_export_functions {
};
}
/// An executor that supports spawning blocking futures in tests.
/// A task executor that can be used in tests.
///
/// Internally this just wraps a `ThreadPool` with a pool size of `8`. This
/// should ensure that we have enough threads in tests for spawning blocking futures.
#[cfg(feature = "std")]
#[derive(Clone)]
pub struct SpawnBlockingExecutor(futures::executor::ThreadPool);
pub struct TaskExecutor(futures::executor::ThreadPool);
#[cfg(feature = "std")]
impl SpawnBlockingExecutor {
impl TaskExecutor {
/// Create a new instance of `Self`.
pub fn new() -> Self {
let mut builder = futures::executor::ThreadPoolBuilder::new();
@@ -377,7 +377,7 @@ impl SpawnBlockingExecutor {
}
#[cfg(feature = "std")]
impl crate::traits::SpawnNamed for SpawnBlockingExecutor {
impl crate::traits::SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
+16 -11
View File
@@ -352,26 +352,21 @@ impl CallInWasmExt {
}
}
/// Something that can spawn tasks and also can be cloned.
pub trait CloneableSpawn: futures::task::Spawn + Send + Sync {
/// Clone as heap-allocated handle.
fn clone(&self) -> Box<dyn CloneableSpawn>;
}
sp_externalities::decl_extension! {
/// Task executor extension.
pub struct TaskExecutorExt(Box<dyn CloneableSpawn>);
pub struct TaskExecutorExt(Box<dyn SpawnNamed>);
}
impl TaskExecutorExt {
/// New instance of task executor extension.
pub fn new(spawn_handle: Box<dyn CloneableSpawn>) -> Self {
Self(spawn_handle)
pub fn new(spawn_handle: impl SpawnNamed + Send + 'static) -> Self {
Self(Box::new(spawn_handle))
}
}
/// Something that can spawn futures (blocking and non-blocking) with am assigned name.
pub trait SpawnNamed {
/// Something that can spawn futures (blocking and non-blocking) with an assigned name.
#[dyn_clonable::clonable]
pub trait SpawnNamed: Clone + Send + Sync {
/// Spawn the given blocking future.
///
/// The given `name` is used to identify the future in tracing.
@@ -381,3 +376,13 @@ pub trait SpawnNamed {
/// The given `name` is used to identify the future in tracing.
fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>);
}
impl SpawnNamed for Box<dyn SpawnNamed> {
fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn_blocking(name, future)
}
fn spawn(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
(**self).spawn(name, future)
}
}
@@ -108,9 +108,9 @@ pub struct Extensions {
}
impl std::fmt::Debug for Extensions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Extensions: ({})", self.extensions.len())
}
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Extensions: ({})", self.extensions.len())
}
}
impl Extensions {
+38 -30
View File
@@ -17,9 +17,9 @@
//! Batch/parallel verification.
use sp_core::{ed25519, sr25519, ecdsa, crypto::Pair, traits::CloneableSpawn};
use sp_core::{ed25519, sr25519, ecdsa, crypto::Pair, traits::SpawnNamed};
use std::sync::{Arc, atomic::{AtomicBool, Ordering as AtomicOrdering}};
use futures::{future::FutureExt, task::FutureObj, channel::oneshot};
use futures::{future::FutureExt, channel::oneshot};
#[derive(Debug, Clone)]
struct Sr25519BatchItem {
@@ -35,14 +35,14 @@ struct Sr25519BatchItem {
/// call `verify_and_clear to get a result. After that, batch verifier is ready for the
/// next batching job.
pub struct BatchVerifier {
scheduler: Box<dyn CloneableSpawn>,
scheduler: Box<dyn SpawnNamed>,
sr25519_items: Vec<Sr25519BatchItem>,
invalid: Arc<AtomicBool>,
pending_tasks: Vec<oneshot::Receiver<()>>,
}
impl BatchVerifier {
pub fn new(scheduler: Box<dyn CloneableSpawn>) -> Self {
pub fn new(scheduler: Box<dyn SpawnNamed>) -> Self {
BatchVerifier {
scheduler,
sr25519_items: Default::default(),
@@ -56,7 +56,9 @@ impl BatchVerifier {
/// Returns `false` if there was already an invalid verification or if
/// the verification could not be spawned.
fn spawn_verification_task(
&mut self, f: impl FnOnce() -> bool + Send + 'static,
&mut self,
f: impl FnOnce() -> bool + Send + 'static,
name: &'static str,
) -> bool {
// there is already invalid transaction encountered
if self.invalid.load(AtomicOrdering::Relaxed) { return false; }
@@ -65,7 +67,8 @@ impl BatchVerifier {
let (sender, receiver) = oneshot::channel();
self.pending_tasks.push(receiver);
self.scheduler.spawn_obj(FutureObj::new(
self.scheduler.spawn(
name,
async move {
if !f() {
invalid_clone.store(true, AtomicOrdering::Relaxed);
@@ -75,15 +78,10 @@ impl BatchVerifier {
log::warn!("Verification halted while result was pending");
invalid_clone.store(true, AtomicOrdering::Relaxed);
}
}.boxed()
))
.map_err(|_| {
log::debug!(
target: "runtime",
"Batch-verification returns false because failed to spawn background task.",
)
})
.is_ok()
}.boxed(),
);
true
}
/// Push ed25519 signature to verify.
@@ -96,7 +94,10 @@ impl BatchVerifier {
pub_key: ed25519::Public,
message: Vec<u8>,
) -> bool {
self.spawn_verification_task(move || ed25519::Pair::verify(&signature, &message, &pub_key))
self.spawn_verification_task(
move || ed25519::Pair::verify(&signature, &message, &pub_key),
"substrate_ed25519_verify",
)
}
/// Push sr25519 signature to verify.
@@ -114,7 +115,10 @@ impl BatchVerifier {
if self.sr25519_items.len() >= 128 {
let items = std::mem::take(&mut self.sr25519_items);
self.spawn_verification_task(move || Self::verify_sr25519_batch(items))
self.spawn_verification_task(
move || Self::verify_sr25519_batch(items),
"substrate_sr25519_verify",
)
} else {
true
}
@@ -130,7 +134,10 @@ impl BatchVerifier {
pub_key: ecdsa::Public,
message: Vec<u8>,
) -> bool {
self.spawn_verification_task(move || ecdsa::Pair::verify(&signature, &message, &pub_key))
self.spawn_verification_task(
move || ecdsa::Pair::verify(&signature, &message, &pub_key),
"substrate_ecdsa_verify",
)
}
fn verify_sr25519_batch(items: Vec<Sr25519BatchItem>) -> bool {
@@ -161,23 +168,24 @@ impl BatchVerifier {
if pending.len() > 0 {
let (sender, receiver) = std::sync::mpsc::channel();
if self.scheduler.spawn_obj(FutureObj::new(async move {
futures::future::join_all(pending).await;
sender.send(())
.expect("Channel never panics if receiver is live. \
Receiver is always live until received this data; qed. ");
}.boxed())).is_err() {
log::debug!(
self.scheduler.spawn(
"substrate_batch_verify_join",
async move {
futures::future::join_all(pending).await;
sender.send(())
.expect("Channel never panics if receiver is live. \
Receiver is always live until received this data; qed. ");
}.boxed(),
);
if receiver.recv().is_err() {
log::warn!(
target: "runtime",
"Batch-verification returns false because failed to spawn background task.",
"Haven't received async result from verification task. Returning false.",
);
return false;
}
if receiver.recv().is_err() {
log::warn!(target: "runtime", "Haven't received async result from verification task. Returning false.");
return false;
}
}
log::trace!(
+10 -5
View File
@@ -1206,9 +1206,10 @@ pub type SubstrateHostFunctions = (
#[cfg(test)]
mod tests {
use super::*;
use sp_core::map;
use sp_state_machine::BasicExternalities;
use sp_core::storage::Storage;
use sp_core::{
storage::Storage, map, traits::TaskExecutorExt, testing::TaskExecutor,
};
use std::any::TypeId;
#[test]
@@ -1274,7 +1275,9 @@ mod tests {
#[test]
fn batch_verify_start_finish_works() {
let mut ext = BasicExternalities::with_tasks_executor();
let mut ext = BasicExternalities::default();
ext.register_extension(TaskExecutorExt::new(TaskExecutor::new()));
ext.execute_with(|| {
crypto::start_batch_verify();
});
@@ -1290,7 +1293,8 @@ mod tests {
#[test]
fn long_sr25519_batching() {
let mut ext = BasicExternalities::with_tasks_executor();
let mut ext = BasicExternalities::default();
ext.register_extension(TaskExecutorExt::new(TaskExecutor::new()));
ext.execute_with(|| {
let pair = sr25519::Pair::generate_with_phrase(None).0;
crypto::start_batch_verify();
@@ -1320,7 +1324,8 @@ mod tests {
#[test]
fn batching_works() {
let mut ext = BasicExternalities::with_tasks_executor();
let mut ext = BasicExternalities::default();
ext.register_extension(TaskExecutorExt::new(TaskExecutor::new()));
ext.execute_with(|| {
// invalid ed25519 signature
crypto::start_batch_verify();
+5 -1
View File
@@ -871,7 +871,11 @@ mod tests {
#[test]
#[should_panic(expected = "Signature verification has not been called")]
fn batching_still_finishes_when_not_called_directly() {
let mut ext = sp_state_machine::BasicExternalities::with_tasks_executor();
let mut ext = sp_state_machine::BasicExternalities::default();
ext.register_extension(
sp_core::traits::TaskExecutorExt::new(sp_core::testing::TaskExecutor::new()),
);
ext.execute_with(|| {
let _batching = SignatureBatching::start();
sp_io::crypto::sr25519_verify(
@@ -33,7 +33,7 @@ use sp_core::{
};
use log::warn;
use codec::Encode;
use sp_externalities::Extensions;
use sp_externalities::{Extensions, Extension};
/// Simple Map-based Externalities impl.
#[derive(Debug)]
@@ -53,17 +53,6 @@ impl BasicExternalities {
Self::new(Storage::default())
}
/// New basic extternalities with tasks executor.
pub fn with_tasks_executor() -> Self {
let mut extensions = Extensions::default();
extensions.register(sp_core::traits::TaskExecutorExt(sp_core::tasks::executor()));
Self {
inner: Storage::default(),
extensions,
}
}
/// Insert key/value
pub fn insert(&mut self, k: StorageKey, v: StorageValue) -> Option<StorageValue> {
self.inner.top.insert(k, v)
@@ -107,6 +96,11 @@ impl BasicExternalities {
pub fn extensions(&mut self) -> &mut Extensions {
&mut self.extensions
}
/// Register an extension.
pub fn register_extension(&mut self, ext: impl Extension) {
self.extensions.register(ext);
}
}
impl PartialEq for BasicExternalities {
+26 -21
View File
@@ -26,7 +26,7 @@ use codec::{Decode, Encode, Codec};
use sp_core::{
offchain::storage::OffchainOverlayedChanges,
storage::ChildInfo, NativeOrEncoded, NeverNativeValue, hexdisplay::HexDisplay,
traits::{CodeExecutor, CallInWasmExt, RuntimeCode},
traits::{CodeExecutor, CallInWasmExt, RuntimeCode, SpawnNamed},
};
use sp_externalities::Extensions;
@@ -77,7 +77,6 @@ pub use trie_backend::TrieBackend;
pub use error::{Error, ExecutionError};
pub use in_memory_backend::new_in_mem;
pub use stats::{UsageInfo, UsageUnit, StateMachineStats};
pub use sp_core::traits::CloneableSpawn;
const PROOF_CLOSE_TRANSACTION: &str = "\
Closing a transaction that was started in this function. Client initiated transactions
@@ -233,7 +232,7 @@ impl<'a, B, H, N, Exec> StateMachine<'a, B, H, N, Exec> where
call_data: &'a [u8],
mut extensions: Extensions,
runtime_code: &'a RuntimeCode,
spawn_handle: Box<dyn CloneableSpawn>,
spawn_handle: impl SpawnNamed + Send + 'static,
) -> Self {
extensions.register(CallInWasmExt::new(exec.clone()));
extensions.register(sp_core::traits::TaskExecutorExt::new(spawn_handle));
@@ -463,11 +462,11 @@ impl<'a, B, H, N, Exec> StateMachine<'a, B, H, N, Exec> where
}
/// Prove execution using the given state backend, overlayed changes, and call executor.
pub fn prove_execution<B, H, N, Exec>(
pub fn prove_execution<B, H, N, Exec, Spawn>(
mut backend: B,
overlay: &mut OverlayedChanges,
exec: &Exec,
spawn_handle: Box<dyn CloneableSpawn>,
spawn_handle: Spawn,
method: &str,
call_data: &[u8],
runtime_code: &RuntimeCode,
@@ -478,10 +477,11 @@ where
H::Out: Ord + 'static + codec::Codec,
Exec: CodeExecutor + Clone + 'static,
N: crate::changes_trie::BlockNumber,
Spawn: SpawnNamed + Send + 'static,
{
let trie_backend = backend.as_trie_backend()
.ok_or_else(|| Box::new(ExecutionError::UnableToGenerateProof) as Box<dyn Error>)?;
prove_execution_on_trie_backend::<_, _, N, _>(
prove_execution_on_trie_backend::<_, _, N, _, _>(
trie_backend,
overlay,
exec,
@@ -501,11 +501,11 @@ where
///
/// Note: changes to code will be in place if this call is made again. For running partial
/// blocks (e.g. a transaction at a time), ensure a different method is used.
pub fn prove_execution_on_trie_backend<S, H, N, Exec>(
pub fn prove_execution_on_trie_backend<S, H, N, Exec, Spawn>(
trie_backend: &TrieBackend<S, H>,
overlay: &mut OverlayedChanges,
exec: &Exec,
spawn_handle: Box<dyn CloneableSpawn>,
spawn_handle: Spawn,
method: &str,
call_data: &[u8],
runtime_code: &RuntimeCode,
@@ -516,6 +516,7 @@ where
H::Out: Ord + 'static + codec::Codec,
Exec: CodeExecutor + 'static + Clone,
N: crate::changes_trie::BlockNumber,
Spawn: SpawnNamed + Send + 'static,
{
let mut offchain_overlay = OffchainOverlayedChanges::default();
let proving_backend = proving_backend::ProvingBackend::new(trie_backend);
@@ -541,12 +542,12 @@ where
}
/// Check execution proof, generated by `prove_execution` call.
pub fn execution_proof_check<H, N, Exec>(
pub fn execution_proof_check<H, N, Exec, Spawn>(
root: H::Out,
proof: StorageProof,
overlay: &mut OverlayedChanges,
exec: &Exec,
spawn_handle: Box<dyn CloneableSpawn>,
spawn_handle: Spawn,
method: &str,
call_data: &[u8],
runtime_code: &RuntimeCode,
@@ -556,9 +557,10 @@ where
Exec: CodeExecutor + Clone + 'static,
H::Out: Ord + 'static + codec::Codec,
N: crate::changes_trie::BlockNumber,
Spawn: SpawnNamed + Send + 'static,
{
let trie_backend = create_proof_check_backend::<H>(root.into(), proof)?;
execution_proof_check_on_trie_backend::<_, N, _>(
execution_proof_check_on_trie_backend::<_, N, _, _>(
&trie_backend,
overlay,
exec,
@@ -570,11 +572,11 @@ where
}
/// Check execution proof on proving backend, generated by `prove_execution` call.
pub fn execution_proof_check_on_trie_backend<H, N, Exec>(
pub fn execution_proof_check_on_trie_backend<H, N, Exec, Spawn>(
trie_backend: &TrieBackend<MemoryDB<H>, H>,
overlay: &mut OverlayedChanges,
exec: &Exec,
spawn_handle: Box<dyn CloneableSpawn>,
spawn_handle: Spawn,
method: &str,
call_data: &[u8],
runtime_code: &RuntimeCode,
@@ -584,6 +586,7 @@ where
H::Out: Ord + 'static + codec::Codec,
Exec: CodeExecutor + Clone + 'static,
N: crate::changes_trie::BlockNumber,
Spawn: SpawnNamed + Send + 'static,
{
let mut offchain_overlay = OffchainOverlayedChanges::default();
let mut sm = StateMachine::<_, H, N, Exec>::new(
@@ -765,7 +768,9 @@ mod tests {
use super::*;
use super::ext::Ext;
use super::changes_trie::Configuration as ChangesTrieConfig;
use sp_core::{map, traits::{Externalities, RuntimeCode}};
use sp_core::{
map, traits::{Externalities, RuntimeCode}, testing::TaskExecutor,
};
use sp_runtime::traits::BlakeTwo256;
#[derive(Clone)]
@@ -859,7 +864,7 @@ mod tests {
&[],
Default::default(),
&wasm_code,
sp_core::tasks::executor(),
TaskExecutor::new(),
);
assert_eq!(
@@ -891,7 +896,7 @@ mod tests {
&[],
Default::default(),
&wasm_code,
sp_core::tasks::executor(),
TaskExecutor::new(),
);
assert_eq!(state_machine.execute(ExecutionStrategy::NativeElseWasm).unwrap(), vec![66]);
@@ -920,7 +925,7 @@ mod tests {
&[],
Default::default(),
&wasm_code,
sp_core::tasks::executor(),
TaskExecutor::new(),
);
assert!(
@@ -947,23 +952,23 @@ mod tests {
// fetch execution proof from 'remote' full node
let remote_backend = trie_backend::tests::test_trie();
let remote_root = remote_backend.storage_root(std::iter::empty()).0;
let (remote_result, remote_proof) = prove_execution::<_, _, u64, _>(
let (remote_result, remote_proof) = prove_execution::<_, _, u64, _, _>(
remote_backend,
&mut Default::default(),
&executor,
sp_core::tasks::executor(),
TaskExecutor::new(),
"test",
&[],
&RuntimeCode::empty(),
).unwrap();
// check proof locally
let local_result = execution_proof_check::<BlakeTwo256, u64, _>(
let local_result = execution_proof_check::<BlakeTwo256, u64, _, _>(
remote_root,
remote_proof,
&mut Default::default(),
&executor,
sp_core::tasks::executor(),
TaskExecutor::new(),
"test",
&[],
&RuntimeCode::empty(),
@@ -39,6 +39,8 @@ use sp_core::{
well_known_keys::{CHANGES_TRIE_CONFIG, CODE, HEAP_PAGES, is_child_storage_key},
Storage,
},
traits::TaskExecutorExt,
testing::TaskExecutor,
};
use codec::Encode;
use sp_externalities::{Extensions, Extension};
@@ -109,8 +111,7 @@ impl<H: Hasher, N: ChangesTrieBlockNumber> TestExternalities<H, N>
let offchain_overlay = OffchainOverlayedChanges::enabled();
let mut extensions = Extensions::default();
extensions.register(sp_core::traits::TaskExecutorExt(sp_core::tasks::executor()));
extensions.register(TaskExecutorExt::new(TaskExecutor::new()));
let offchain_db = TestPersistentOffchainDB::new();