mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-29 02:07:56 +00:00
Move spawning tasks from thread pools to Service's TaskManager for block importing (#5647)
Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
@@ -14,10 +14,9 @@
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
use std::{mem, pin::Pin, time::Duration, marker::PhantomData, sync::Arc};
|
||||
use futures::{prelude::*, task::Context, task::Poll};
|
||||
use std::{mem, pin::Pin, time::Duration, marker::PhantomData};
|
||||
use futures::{prelude::*, task::Context, task::Poll, future::BoxFuture};
|
||||
use futures_timer::Delay;
|
||||
use parking_lot::{Mutex, Condvar};
|
||||
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
|
||||
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
|
||||
|
||||
@@ -36,31 +35,14 @@ pub struct BasicQueue<B: BlockT, Transaction> {
|
||||
sender: TracingUnboundedSender<ToWorkerMsg<B>>,
|
||||
/// Results coming from the worker task.
|
||||
result_port: BufferedLinkReceiver<B>,
|
||||
/// If it isn't possible to spawn the future in `future_to_spawn` (which is notably the case in
|
||||
/// "no std" environment), we instead put it in `manual_poll`. It is then polled manually from
|
||||
/// `poll_actions`.
|
||||
manual_poll: Option<Pin<Box<dyn Future<Output = ()> + Send>>>,
|
||||
/// A thread pool where the background worker is being run.
|
||||
pool: Option<futures::executor::ThreadPool>,
|
||||
pool_guard: Arc<(Mutex<usize>, Condvar)>,
|
||||
_phantom: PhantomData<Transaction>,
|
||||
}
|
||||
|
||||
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
|
||||
fn drop(&mut self) {
|
||||
self.pool = None;
|
||||
// Flush the queue and close the receiver to terminate the future.
|
||||
self.sender.close_channel();
|
||||
self.result_port.close();
|
||||
|
||||
// Make sure all pool threads terminate.
|
||||
// https://github.com/rust-lang/futures-rs/issues/1470
|
||||
// https://github.com/rust-lang/futures-rs/issues/1349
|
||||
let (ref mutex, ref condvar) = *self.pool_guard;
|
||||
let mut lock = mutex.lock();
|
||||
while *lock != 0 {
|
||||
condvar.wait(&mut lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,6 +56,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
|
||||
block_import: BoxBlockImport<B, Transaction>,
|
||||
justification_import: Option<BoxJustificationImport<B>>,
|
||||
finality_proof_import: Option<BoxFinalityProofImport<B>>,
|
||||
spawner: impl Fn(BoxFuture<'static, ()>) -> (),
|
||||
) -> Self {
|
||||
let (result_sender, result_port) = buffered_link::buffered_link();
|
||||
let (future, worker_sender) = BlockImportWorker::new(
|
||||
@@ -84,39 +67,11 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
|
||||
finality_proof_import,
|
||||
);
|
||||
|
||||
let guard = Arc::new((Mutex::new(0usize), Condvar::new()));
|
||||
let guard_start = guard.clone();
|
||||
let guard_end = guard.clone();
|
||||
|
||||
let mut pool = futures::executor::ThreadPool::builder()
|
||||
.name_prefix("import-queue-worker-")
|
||||
.pool_size(1)
|
||||
.after_start(move |_| *guard_start.0.lock() += 1)
|
||||
.before_stop(move |_| {
|
||||
let (ref mutex, ref condvar) = *guard_end;
|
||||
let mut lock = mutex.lock();
|
||||
*lock -= 1;
|
||||
if *lock == 0 {
|
||||
condvar.notify_one();
|
||||
}
|
||||
})
|
||||
.create()
|
||||
.ok();
|
||||
|
||||
let manual_poll;
|
||||
if let Some(pool) = &mut pool {
|
||||
pool.spawn_ok(futures_diagnose::diagnose("import-queue", future));
|
||||
manual_poll = None;
|
||||
} else {
|
||||
manual_poll = Some(Box::pin(future) as Pin<Box<_>>);
|
||||
}
|
||||
spawner(future.boxed());
|
||||
|
||||
Self {
|
||||
sender: worker_sender,
|
||||
result_port,
|
||||
manual_poll,
|
||||
pool,
|
||||
pool_guard: guard,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
@@ -160,15 +115,6 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
|
||||
}
|
||||
|
||||
fn poll_actions(&mut self, cx: &mut Context, link: &mut dyn Link<B>) {
|
||||
// As a backup mechanism, if we failed to spawn the `future_to_spawn`, we instead poll
|
||||
// manually here.
|
||||
if let Some(manual_poll) = self.manual_poll.as_mut() {
|
||||
match Future::poll(Pin::new(manual_poll), cx) {
|
||||
Poll::Pending => {}
|
||||
_ => self.manual_poll = None,
|
||||
}
|
||||
}
|
||||
|
||||
self.result_port.poll_actions(cx, link);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user