diff --git a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs index 03c0661f92..541c1ff0f4 100644 --- a/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/primitives/consensus/common/src/import_queue/basic_queue.rs @@ -15,11 +15,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{mem, pin::Pin, time::Duration, marker::PhantomData}; +use std::{pin::Pin, time::Duration, marker::PhantomData}; use futures::{prelude::*, task::Context, task::Poll}; use futures_timer::Delay; use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; -use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded}; +use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded, TracingUnboundedReceiver}; use prometheus_endpoint::Registry; use crate::{ @@ -146,16 +146,48 @@ mod worker_messages { pub struct ImportJustification(pub Origin, pub B::Hash, pub NumberFor, pub Justification); } -struct BlockImportWorker { - result_sender: BufferedLinkSender, - justification_import: Option>, - delay_between_blocks: Duration, +/// The process of importing blocks. +/// +/// This polls the `block_import_receiver` for new blocks to import and than awaits on importing these blocks. +/// After each block is imported, this async function yields once to give other futures the possibility +/// to be run. +/// +/// Returns when `block_import` ended. +async fn block_import_process( + mut block_import: BoxBlockImport, + mut verifier: impl Verifier, + mut result_sender: BufferedLinkSender, + mut block_import_receiver: TracingUnboundedReceiver>, metrics: Option, - _phantom: PhantomData, + delay_between_blocks: Duration, +) { + loop { + let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await { + Some(blocks) => blocks, + None => return, + }; + + let res = import_many_blocks( + &mut block_import, + origin, + blocks, + &mut verifier, + delay_between_blocks, + metrics.clone(), + ).await; + + result_sender.blocks_processed(res.imported, res.block_count, res.results); + } } -impl BlockImportWorker { - fn new>( +struct BlockImportWorker { + result_sender: BufferedLinkSender, + justification_import: Option>, + metrics: Option, +} + +impl BlockImportWorker { + fn new, Transaction: Send>( result_sender: BufferedLinkSender, verifier: V, block_import: BoxBlockImport, @@ -171,15 +203,13 @@ impl BlockImportWorker { let (justification_sender, mut justification_port) = tracing_unbounded("mpsc_import_queue_worker_justification"); - let (block_import_sender, mut block_import_port) = + let (block_import_sender, block_import_port) = tracing_unbounded("mpsc_import_queue_worker_blocks"); let mut worker = BlockImportWorker { result_sender, justification_import, - delay_between_blocks: Duration::new(0, 0), metrics, - _phantom: PhantomData, }; // Let's initialize `justification_import` @@ -189,93 +219,47 @@ impl BlockImportWorker { } } - // The future below has two possible states: - // - // - Currently importing many blocks, in which case `importing` is `Some` and contains a - // `Future`, and `block_import` is `None`. - // - Something else, in which case `block_import` is `Some` and `importing` is None. - // - // Additionally, the task will prioritize processing of justification import messages over - // block import messages, hence why two distinct channels are used. - let mut block_import_verifier = Some((block_import, verifier)); - let mut importing = None; + let delay_between_blocks = Duration::default(); + + let future = async move { + let block_import_process = block_import_process( + block_import, + verifier, + worker.result_sender.clone(), + block_import_port, + worker.metrics.clone(), + delay_between_blocks, + ); + futures::pin_mut!(block_import_process); - let future = futures::future::poll_fn(move |cx| { loop { // If the results sender is closed, that means that the import queue is shutting // down and we should end this future. if worker.result_sender.is_closed() { - return Poll::Ready(()) + return; } - // Grab the next justification import request sent to the import queue. - match Stream::poll_next(Pin::new(&mut justification_port), cx) { - Poll::Ready(Some(ImportJustification(who, hash, number, justification))) => { - worker.import_justification(who, hash, number, justification); - continue; - }, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => {}, - }; - - // If we are in the process of importing a bunch of blocks, let's resume this - // process before doing anything more. - if let Some(imp_fut) = importing.as_mut() { - match Future::poll(Pin::new(imp_fut), cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready((bi, verif)) => { - block_import_verifier = Some((bi, verif)); - importing = None; - }, + // Make sure to first process all justifications + while let Poll::Ready(justification) = futures::poll!(justification_port.next()) { + match justification { + Some(ImportJustification(who, hash, number, justification)) => + worker.import_justification(who, hash, number, justification), + None => return, } } - debug_assert!(importing.is_none()); - debug_assert!(block_import_verifier.is_some()); + if let Poll::Ready(()) = futures::poll!(&mut block_import_process) { + return; + } - // Grab the next block import request sent to the import queue. - let ImportBlocks(origin, blocks) = - match Stream::poll_next(Pin::new(&mut block_import_port), cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => return Poll::Pending, - }; - - // On blocks import request, we merely *start* the process and store - // a `Future` into `importing`. - let (block_import, verifier) = block_import_verifier - .take() - .expect("block_import_verifier is always Some; qed"); - - importing = Some(worker.import_batch(block_import, verifier, origin, blocks)); + // All futures that we polled are now pending. + futures::pending!() } - }); + }; (future, justification_sender, block_import_sender) } - /// Returns a `Future` that imports the given blocks and sends the results on - /// `self.result_sender`. - /// - /// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is - /// yielded back in the output once the import is finished. - fn import_batch>( - &mut self, - block_import: BoxBlockImport, - verifier: V, - origin: BlockOrigin, - blocks: Vec>, - ) -> impl Future, V)> { - let mut result_sender = self.result_sender.clone(); - let metrics = self.metrics.clone(); - - import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks, metrics) - .then(move |(imported, count, results, block_import, verifier)| { - result_sender.blocks_processed(imported, count, results); - future::ready((block_import, verifier)) - }) - } - fn import_justification( &mut self, who: Origin, @@ -307,29 +291,27 @@ impl BlockImportWorker { } } +/// Result of [`import_many_blocks`]. +struct ImportManyBlocksResult { + /// The number of blocks imported successfully. + imported: usize, + /// The total number of blocks processed. + block_count: usize, + /// The import results for each block. + results: Vec<(Result>, BlockImportError>, B::Hash)>, +} + /// Import several blocks at once, returning import result for each block. /// -/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is yielded -/// back in the output once the import is finished. -/// -/// The returned `Future` yields at every imported block, which makes the execution more -/// fine-grained and making it possible to interrupt the process. -fn import_many_blocks, Transaction>( - import_handle: BoxBlockImport, +/// This will yield after each imported block once, to ensure that other futures can be called as well. +async fn import_many_blocks, Transaction>( + import_handle: &mut BoxBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, - verifier: V, + verifier: &mut V, delay_between_blocks: Duration, metrics: Option, -) -> impl Future< - Output = ( - usize, - usize, - Vec<(Result>, BlockImportError>, B::Hash)>, - BoxBlockImport, - V, - ), -> { +) -> ImportManyBlocksResult { let count = blocks.len(); let blocks_range = match ( @@ -347,44 +329,18 @@ fn import_many_blocks, Transaction>( let mut results = vec![]; let mut has_error = false; let mut blocks = blocks.into_iter(); - let mut import_handle = Some(import_handle); - let mut waiting = None; - let mut verifier = Some(verifier); // Blocks in the response/drain should be in ascending order. - - future::poll_fn(move |cx| { - // Handle the optional timer that makes us wait before the next import. - if let Some(waiting) = &mut waiting { - match Future::poll(Pin::new(waiting), cx) { - Poll::Ready(_) => {}, - Poll::Pending => return Poll::Pending, - } - } - waiting = None; - + loop { // Is there any block left to import? let block = match blocks.next() { Some(b) => b, None => { // No block left to import, success! - let import_handle = import_handle.take() - .expect("Future polled again after it has finished (import handle is None)"); - let verifier = verifier.take() - .expect("Future polled again after it has finished (verifier handle is None)"); - let results = mem::replace(&mut results, Vec::new()); - return Poll::Ready((imported, count, results, import_handle, verifier)); + return ImportManyBlocksResult { block_count: count, imported, results } }, }; - // We extract the content of `import_handle` and `verifier` only when the future ends, - // therefore `import_handle` and `verifier` are always `Some` here. It is illegal to poll - // a `Future` again after it has ended. - let import_handle = import_handle.as_mut() - .expect("Future polled again after it has finished (import handle is None)"); - let verifier = verifier.as_mut() - .expect("Future polled again after it has finished (verifier handle is None)"); - let block_number = block.header.as_ref().map(|h| h.number().clone()); let block_hash = block.hash; let import_result = if has_error { @@ -392,7 +348,7 @@ fn import_many_blocks, Transaction>( } else { // The actual import. import_single_block_metered( - &mut **import_handle, + import_handle, blocks_origin.clone(), block, verifier, @@ -405,7 +361,12 @@ fn import_many_blocks, Transaction>( } if import_result.is_ok() { - trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash); + trace!( + target: "sync", + "Block imported successfully {:?} ({})", + block_number, + block_hash, + ); imported += 1; } else { has_error = true; @@ -413,14 +374,40 @@ fn import_many_blocks, Transaction>( results.push((import_result, block_hash)); - // Notifies the current task again so that we re-execute this closure again for the next - // block. - if delay_between_blocks != Duration::new(0, 0) { - waiting = Some(Delay::new(delay_between_blocks)); + if delay_between_blocks != Duration::default() && !has_error { + Delay::new(delay_between_blocks).await; + } else { + Yield::new().await } - cx.waker().wake_by_ref(); - Poll::Pending - }) + } +} + +/// A future that will always `yield` on the first call of `poll` but schedules the current task for +/// re-execution. + +/// +/// This is done by getting the waker and calling `wake_by_ref` followed by returning `Pending`. +/// The next time the `poll` is called, it will return `Ready`. +struct Yield(bool); + +impl Yield { + fn new() -> Self { + Self(false) + } +} + +impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if !self.0 { + self.0 = true; + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(()) + } + } } #[cfg(test)] @@ -517,8 +504,9 @@ mod tests { fn prioritizes_finality_work_over_block_import() { let (result_sender, mut result_port) = buffered_link::buffered_link(); - let (mut worker, mut finality_sender, mut block_import_sender) = + let (worker, mut finality_sender, mut block_import_sender) = BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None); + futures::pin_mut!(worker); let mut import_block = |n| { let header = Header {