mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-26 19:17:58 +00:00
Runtime worker threads (#7089)
* std variant * principal work * format and naming * format and naming continued * working nested fork * add comment * naming and tabs * line width * fix wording * address review * refactor dynamic dispatch * update wasmtime * some care * move ext * more refactor * doc effort * simplify * doc effort * tests and docs * address review * naming * explain some args * add example * unwinding for native and tests * rename stray * fix refs * fix tests * fix warnings * stray naming * fixes and comments * Update primitives/io/src/tasks.rs Co-authored-by: cheme <emericchevalier.pro@gmail.com> * make examples "compile" * dyn_dispatch -> spawn_call * fix impl * address review * Update primitives/io/src/lib.rs Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com> * Update primitives/io/src/tasks.rs Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com> * Update primitives/io/src/async_externalities.rs Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com> * Update primitives/io/src/tasks.rs Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com> * Update frame/example-parallel/src/lib.rs Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com> * fix compilation * Update client/executor/common/src/wasm_runtime.rs Co-authored-by: Sergei Shulepov <sergei@parity.io> * address review * Update client/executor/wasmtime/src/instance_wrapper.rs Co-authored-by: Sergei Shulepov <sergei@parity.io> * Update client/executor/src/native_executor.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update primitives/io/src/tasks.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update client/executor/src/native_executor.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update primitives/io/src/tasks.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * Update client/executor/wasmtime/src/instance_wrapper.rs Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com> * address some issues * address more issues * wasm_only interface * define sp_tasks * avoid anyhow * fix example Co-authored-by: cheme <emericchevalier.pro@gmail.com> Co-authored-by: Kian Paimani <5588131+kianenigma@users.noreply.github.com> Co-authored-by: Sergei Shulepov <sergei@parity.io> Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
@@ -20,15 +20,28 @@ use crate::{
|
||||
RuntimeInfo, error::{Error, Result},
|
||||
wasm_runtime::{RuntimeCache, WasmExecutionMethod},
|
||||
};
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
panic::{UnwindSafe, AssertUnwindSafe},
|
||||
result,
|
||||
sync::{Arc, atomic::{AtomicU64, Ordering}, mpsc},
|
||||
};
|
||||
|
||||
use sp_version::{NativeVersion, RuntimeVersion};
|
||||
use codec::{Decode, Encode};
|
||||
use sp_core::{
|
||||
NativeOrEncoded, traits::{CodeExecutor, Externalities, RuntimeCode, MissingHostFunctions},
|
||||
NativeOrEncoded,
|
||||
traits::{
|
||||
CodeExecutor, Externalities, RuntimeCode, MissingHostFunctions,
|
||||
RuntimeSpawnExt, RuntimeSpawn,
|
||||
},
|
||||
};
|
||||
use log::trace;
|
||||
use std::{result, panic::{UnwindSafe, AssertUnwindSafe}, sync::Arc};
|
||||
use sp_wasm_interface::{HostFunctions, Function};
|
||||
use sc_executor_common::wasm_runtime::WasmInstance;
|
||||
use sc_executor_common::wasm_runtime::{WasmInstance, WasmModule, InvokeMethod};
|
||||
use sp_externalities::ExternalitiesExt as _;
|
||||
use sp_tasks::new_async_externalities;
|
||||
|
||||
/// Default num of pages for the heap
|
||||
const DEFAULT_HEAP_PAGES: u64 = 1024;
|
||||
@@ -136,6 +149,7 @@ impl WasmExecutor {
|
||||
f: F,
|
||||
) -> Result<R>
|
||||
where F: FnOnce(
|
||||
AssertUnwindSafe<&Arc<dyn WasmModule>>,
|
||||
AssertUnwindSafe<&dyn WasmInstance>,
|
||||
Option<&RuntimeVersion>,
|
||||
AssertUnwindSafe<&mut dyn Externalities>,
|
||||
@@ -148,10 +162,11 @@ impl WasmExecutor {
|
||||
self.default_heap_pages,
|
||||
&*self.host_functions,
|
||||
allow_missing_host_functions,
|
||||
|instance, version, ext| {
|
||||
|module, instance, version, ext| {
|
||||
let module = AssertUnwindSafe(module);
|
||||
let instance = AssertUnwindSafe(instance);
|
||||
let ext = AssertUnwindSafe(ext);
|
||||
f(instance, version, ext)
|
||||
f(module, instance, version, ext)
|
||||
}
|
||||
)? {
|
||||
Ok(r) => r,
|
||||
@@ -179,10 +194,13 @@ impl sp_core::traits::CallInWasm for WasmExecutor {
|
||||
heap_pages: None,
|
||||
};
|
||||
|
||||
self.with_instance(&code, ext, allow_missing_host_functions, |instance, _, mut ext| {
|
||||
self.with_instance(&code, ext, allow_missing_host_functions, |module, instance, _, mut ext| {
|
||||
with_externalities_safe(
|
||||
&mut **ext,
|
||||
move || instance.call(method, call_data),
|
||||
move || {
|
||||
RuntimeInstanceSpawn::register_on_externalities(module.clone());
|
||||
instance.call_export(method, call_data)
|
||||
}
|
||||
)
|
||||
}).map_err(|e| e.to_string())
|
||||
} else {
|
||||
@@ -200,10 +218,14 @@ impl sp_core::traits::CallInWasm for WasmExecutor {
|
||||
|
||||
let instance = AssertUnwindSafe(instance);
|
||||
let mut ext = AssertUnwindSafe(ext);
|
||||
let module = AssertUnwindSafe(module);
|
||||
|
||||
with_externalities_safe(
|
||||
&mut **ext,
|
||||
move || instance.call(method, call_data),
|
||||
move || {
|
||||
RuntimeInstanceSpawn::register_on_externalities(module.clone());
|
||||
instance.call_export(method, call_data)
|
||||
}
|
||||
)
|
||||
.and_then(|r| r)
|
||||
.map_err(|e| e.to_string())
|
||||
@@ -269,12 +291,149 @@ impl<D: NativeExecutionDispatch> RuntimeInfo for NativeExecutor<D> {
|
||||
runtime_code,
|
||||
ext,
|
||||
false,
|
||||
|_instance, version, _ext|
|
||||
|_module, _instance, version, _ext|
|
||||
Ok(version.cloned().ok_or_else(|| Error::ApiError("Unknown version".into()))),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper inner struct to implement `RuntimeSpawn` extension.
|
||||
pub struct RuntimeInstanceSpawn {
|
||||
module: Arc<dyn WasmModule>,
|
||||
tasks: parking_lot::Mutex<HashMap<u64, mpsc::Receiver<Vec<u8>>>>,
|
||||
counter: AtomicU64,
|
||||
scheduler: Box<dyn sp_core::traits::SpawnNamed>,
|
||||
}
|
||||
|
||||
impl RuntimeSpawn for RuntimeInstanceSpawn {
|
||||
fn spawn_call(&self, dispatcher_ref: u32, func: u32, data: Vec<u8>) -> u64 {
|
||||
let new_handle = self.counter.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
self.tasks.lock().insert(new_handle, receiver);
|
||||
|
||||
let module = self.module.clone();
|
||||
let scheduler = self.scheduler.clone();
|
||||
self.scheduler.spawn("executor-extra-runtime-instance", Box::pin(async move {
|
||||
let module = AssertUnwindSafe(module);
|
||||
|
||||
let async_ext = match new_async_externalities(scheduler.clone()) {
|
||||
Ok(val) => val,
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
target: "executor",
|
||||
"Failed to setup externalities for async context: {}",
|
||||
e,
|
||||
);
|
||||
|
||||
// This will drop sender and receiver end will panic
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut async_ext = match async_ext.with_runtime_spawn(
|
||||
Box::new(RuntimeInstanceSpawn::new(module.clone(), scheduler))
|
||||
) {
|
||||
Ok(val) => val,
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
target: "executor",
|
||||
"Failed to setup runtime extension for async externalities: {}",
|
||||
e,
|
||||
);
|
||||
|
||||
// This will drop sender and receiver end will panic
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let result = with_externalities_safe(
|
||||
&mut async_ext,
|
||||
move || {
|
||||
|
||||
// FIXME: Should be refactored to shared "instance factory".
|
||||
// Instantiating wasm here every time is suboptimal at the moment, shared
|
||||
// pool of instances should be used.
|
||||
//
|
||||
// https://github.com/paritytech/substrate/issues/7354
|
||||
let instance = module.new_instance()
|
||||
.expect("Failed to create new instance from module");
|
||||
|
||||
instance.call(
|
||||
InvokeMethod::TableWithWrapper { dispatcher_ref, func },
|
||||
&data[..],
|
||||
).expect("Failed to invoke instance.")
|
||||
}
|
||||
);
|
||||
|
||||
match result {
|
||||
Ok(output) => {
|
||||
let _ = sender.send(output);
|
||||
},
|
||||
Err(error) => {
|
||||
// If execution is panicked, the `join` in the original runtime code will panic as well,
|
||||
// since the sender is dropped without sending anything.
|
||||
log::error!("Call error in spawned task: {:?}", error);
|
||||
},
|
||||
}
|
||||
}));
|
||||
|
||||
|
||||
new_handle
|
||||
}
|
||||
|
||||
fn join(&self, handle: u64) -> Vec<u8> {
|
||||
let receiver = self.tasks.lock().remove(&handle).expect("No task for the handle");
|
||||
let output = receiver.recv().expect("Spawned task panicked for the handle");
|
||||
output
|
||||
}
|
||||
}
|
||||
|
||||
impl RuntimeInstanceSpawn {
|
||||
pub fn new(
|
||||
module: Arc<dyn WasmModule>,
|
||||
scheduler: Box<dyn sp_core::traits::SpawnNamed>,
|
||||
) -> Self {
|
||||
Self {
|
||||
module,
|
||||
scheduler,
|
||||
counter: 0.into(),
|
||||
tasks: HashMap::new().into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn with_externalities_and_module(
|
||||
module: Arc<dyn WasmModule>,
|
||||
mut ext: &mut dyn Externalities,
|
||||
) -> Option<Self> {
|
||||
ext.extension::<sp_core::traits::TaskExecutorExt>()
|
||||
.map(move |task_ext| Self::new(module, task_ext.clone()))
|
||||
}
|
||||
|
||||
/// Register new `RuntimeSpawnExt` on current externalities.
|
||||
///
|
||||
/// This extensions will spawn instances from provided `module`.
|
||||
pub fn register_on_externalities(module: Arc<dyn WasmModule>) {
|
||||
sp_externalities::with_externalities(
|
||||
move |mut ext| {
|
||||
if let Some(runtime_spawn) =
|
||||
Self::with_externalities_and_module(module.clone(), ext)
|
||||
{
|
||||
if let Err(e) = ext.register_extension(
|
||||
RuntimeSpawnExt(Box::new(runtime_spawn))
|
||||
) {
|
||||
trace!(
|
||||
target: "executor",
|
||||
"Failed to register `RuntimeSpawnExt` instance on externalities: {:?}",
|
||||
e,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: NativeExecutionDispatch + 'static> CodeExecutor for NativeExecutor<D> {
|
||||
type Error = Error;
|
||||
|
||||
@@ -295,32 +454,34 @@ impl<D: NativeExecutionDispatch + 'static> CodeExecutor for NativeExecutor<D> {
|
||||
runtime_code,
|
||||
ext,
|
||||
false,
|
||||
|instance, onchain_version, mut ext| {
|
||||
|module, instance, onchain_version, mut ext| {
|
||||
let onchain_version = onchain_version.ok_or_else(
|
||||
|| Error::ApiError("Unknown version".into())
|
||||
)?;
|
||||
|
||||
let can_call_with = onchain_version.can_call_with(&self.native_version.runtime_version);
|
||||
|
||||
match (
|
||||
use_native,
|
||||
onchain_version.can_call_with(&self.native_version.runtime_version),
|
||||
can_call_with,
|
||||
native_call,
|
||||
) {
|
||||
(_, false, _) => {
|
||||
trace!(
|
||||
target: "executor",
|
||||
"Request for native execution failed (native: {}, chain: {})",
|
||||
self.native_version.runtime_version,
|
||||
onchain_version,
|
||||
);
|
||||
(_, false, _) | (false, _, _) => {
|
||||
if !can_call_with {
|
||||
trace!(
|
||||
target: "executor",
|
||||
"Request for native execution failed (native: {}, chain: {})",
|
||||
self.native_version.runtime_version,
|
||||
onchain_version,
|
||||
);
|
||||
}
|
||||
|
||||
with_externalities_safe(
|
||||
&mut **ext,
|
||||
move || instance.call(method, data).map(NativeOrEncoded::Encoded)
|
||||
)
|
||||
}
|
||||
(false, _, _) => {
|
||||
with_externalities_safe(
|
||||
&mut **ext,
|
||||
move || instance.call(method, data).map(NativeOrEncoded::Encoded)
|
||||
move || {
|
||||
RuntimeInstanceSpawn::register_on_externalities(module.clone());
|
||||
instance.call_export(method, data).map(NativeOrEncoded::Encoded)
|
||||
}
|
||||
)
|
||||
},
|
||||
(true, true, Some(call)) => {
|
||||
|
||||
Reference in New Issue
Block a user