Run each offchain worker in a separate thread. (#3169)

* Run each offchain worker in a separate thread.

* Start a thread only on supported targets.

* Flip conditions.

* Remove platform-specific code.
This commit is contained in:
Tomasz Drwięga
2019-07-23 10:59:01 +02:00
committed by Bastian Köcher
parent af40c36873
commit be9e97afb2
+31 -5
View File
@@ -128,9 +128,9 @@ impl<Client, Storage, KeyProvider, Block> OffchainWorkers<
Block,
> where
Block: traits::Block,
Client: ProvideRuntimeApi,
Client: ProvideRuntimeApi + Send + Sync + 'static,
Client::Api: OffchainWorkerApi<Block>,
KeyProvider: AuthorityKeyProvider<Block>,
KeyProvider: AuthorityKeyProvider<Block> + Send,
Storage: client::backend::OffchainStorage + 'static,
{
/// Start the offchain workers after given block.
@@ -157,9 +157,22 @@ impl<Client, Storage, KeyProvider, Block> OffchainWorkers<
at.clone(),
network_state.clone(),
);
debug!("Running offchain workers at {:?}", at);
let api = Box::new(api);
runtime.offchain_worker_with_context(&at, ExecutionContext::OffchainWorker(api), *number).unwrap();
debug!("Spawning offchain workers at {:?}", at);
let number = *number;
let client = self.client.clone();
spawn_worker(move || {
let runtime = client.runtime_api();
let api = Box::new(api);
debug!("Running offchain workers at {:?}", at);
let run = runtime.offchain_worker_with_context(
&at,
ExecutionContext::OffchainWorker(api),
number
);
if let Err(e) = run {
log::error!("Error running offchain workers at {:?}: {:?}", at, e);
}
});
futures::future::Either::A(runner.process())
} else {
futures::future::Either::B(futures::future::ok(()))
@@ -167,6 +180,19 @@ impl<Client, Storage, KeyProvider, Block> OffchainWorkers<
}
}
/// Spawns a new offchain worker.
///
/// We spawn offchain workers for each block in a separate thread,
/// since they can run for a significant amount of time
/// in a blocking fashion and we don't want to block the runtime.
///
/// Note that we should avoid that if we switch to future-based runtime in the future,
/// alternatively:
/// TODO [ToDr] (#1458) we can consider using a thread pool instead.
fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) {
std::thread::spawn(f);
}
#[cfg(test)]
mod tests {
use super::*;