use ThreadPool to execute spawn_worker(fn) (#3836)

* use ThreadPool to spawn_worker()

* use ThreadPool to implement spawn_worker(fn)

* use ThreadPool to implement spawn_worker(f)

* update [dependencies] threadpool and num_cpus version

*  rm 'extern crate num_cpus'

* cargo.lock update

*  merge the newest cargo.lock

* Update Cargo.lock

* use Mutex to wrap OffchainWorkers.thread_pool

* format use crate

* use parking_lot::Mutex instead of std::sync::Mutex
This commit is contained in:
CrocdileChan
2019-10-21 15:25:50 +08:00
committed by Tomasz Drwięga
parent 28f7814fb2
commit 60d232c727
3 changed files with 29 additions and 13 deletions
+16 -13
View File
@@ -39,6 +39,8 @@ use std::{
sync::Arc,
};
use parking_lot::Mutex;
use threadpool::ThreadPool;
use client::runtime_api::ApiExt;
use futures::future::Future;
use log::{debug, warn};
@@ -58,6 +60,7 @@ pub struct OffchainWorkers<Client, Storage, Block: traits::Block> {
client: Arc<Client>,
db: Storage,
_block: PhantomData<Block>,
thread_pool: Mutex<ThreadPool>,
}
impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Block> {
@@ -67,6 +70,7 @@ impl<Client, Storage, Block: traits::Block> OffchainWorkers<Client, Storage, Blo
client,
db,
_block: PhantomData,
thread_pool: Mutex::new(ThreadPool::new(num_cpus::get())),
}
}
}
@@ -116,7 +120,7 @@ impl<Client, Storage, Block> OffchainWorkers<
debug!("Spawning offchain workers at {:?}", at);
let number = *number;
let client = self.client.clone();
spawn_worker(move || {
self.spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
@@ -134,19 +138,18 @@ impl<Client, Storage, Block> OffchainWorkers<
futures::future::Either::Right(futures::future::ready(()))
}
}
}
/// Spawns a new offchain worker.
///
/// We spawn offchain workers for each block in a separate thread,
/// since they can run for a significant amount of time
/// in a blocking fashion and we don't want to block the runtime.
///
/// Note that we should avoid that if we switch to future-based runtime in the future,
/// alternatively:
/// TODO [ToDr] (#1458) we can consider using a thread pool instead.
fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) {
std::thread::spawn(f);
/// Spawns a new offchain worker.
///
/// We spawn offchain workers for each block in a separate thread,
/// since they can run for a significant amount of time
/// in a blocking fashion and we don't want to block the runtime.
///
/// Note that we should avoid that if we switch to future-based runtime in the future,
/// alternatively:
fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
self.thread_pool.lock().execute(f);
}
}
#[cfg(test)]