From be9e97afb28956a4f9207222c795214299799a99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 23 Jul 2019 10:59:01 +0200 Subject: [PATCH] Run each offchain worker in a separate thread. (#3169) * Run each offchain worker in a separate thread. * Start a thread only on supported targets. * Flip conditions. * Remove platform-specific code. --- substrate/core/offchain/src/lib.rs | 36 +++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/substrate/core/offchain/src/lib.rs b/substrate/core/offchain/src/lib.rs index e187bbc0e3..d4947e271f 100644 --- a/substrate/core/offchain/src/lib.rs +++ b/substrate/core/offchain/src/lib.rs @@ -128,9 +128,9 @@ impl OffchainWorkers< Block, > where Block: traits::Block, - Client: ProvideRuntimeApi, + Client: ProvideRuntimeApi + Send + Sync + 'static, Client::Api: OffchainWorkerApi, - KeyProvider: AuthorityKeyProvider, + KeyProvider: AuthorityKeyProvider + Send, Storage: client::backend::OffchainStorage + 'static, { /// Start the offchain workers after given block. @@ -157,9 +157,22 @@ impl OffchainWorkers< at.clone(), network_state.clone(), ); - debug!("Running offchain workers at {:?}", at); - let api = Box::new(api); - runtime.offchain_worker_with_context(&at, ExecutionContext::OffchainWorker(api), *number).unwrap(); + debug!("Spawning offchain workers at {:?}", at); + let number = *number; + let client = self.client.clone(); + spawn_worker(move || { + let runtime = client.runtime_api(); + let api = Box::new(api); + debug!("Running offchain workers at {:?}", at); + let run = runtime.offchain_worker_with_context( + &at, + ExecutionContext::OffchainWorker(api), + number + ); + if let Err(e) = run { + log::error!("Error running offchain workers at {:?}: {:?}", at, e); + } + }); futures::future::Either::A(runner.process()) } else { futures::future::Either::B(futures::future::ok(())) @@ -167,6 +180,19 @@ impl OffchainWorkers< } } +/// Spawns a new offchain worker. +/// +/// We spawn offchain workers for each block in a separate thread, +/// since they can run for a significant amount of time +/// in a blocking fashion and we don't want to block the runtime. +/// +/// Note that we should avoid that if we switch to future-based runtime in the future, +/// alternatively: +/// TODO [ToDr] (#1458) we can consider using a thread pool instead. +fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) { + std::thread::spawn(f); +} + #[cfg(test)] mod tests { use super::*;