diff --git a/substrate/core/consensus/common/src/import_queue/basic_queue.rs b/substrate/core/consensus/common/src/import_queue/basic_queue.rs index 082006ae5d..d94c3a383c 100644 --- a/substrate/core/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/core/consensus/common/src/import_queue/basic_queue.rs @@ -14,12 +14,12 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::{pin::Pin, sync::Arc}; +use std::{mem, pin::Pin, sync::Arc, time::Duration}; use futures::{prelude::*, channel::mpsc, task::SpawnExt as _, task::Context, task::Poll}; +use futures_timer::Delay; use runtime_primitives::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}}; -use crate::error::Error as ConsensusError; -use crate::block_import::{BlockImport, BlockOrigin}; +use crate::block_import::BlockOrigin; use crate::import_queue::{ BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport, BoxJustificationImport, ImportQueue, Link, Origin, @@ -136,10 +136,10 @@ enum ToWorkerMsg { struct BlockImportWorker> { result_sender: BufferedLinkSender, - block_import: BoxBlockImport, justification_import: Option>, finality_proof_import: Option>, verifier: Arc, + delay_between_blocks: Duration, } impl> BlockImportWorker { @@ -156,24 +156,55 @@ impl> BlockImportWorker { result_sender, verifier, justification_import, - block_import, finality_proof_import, + delay_between_blocks: Duration::new(0, 0), }; + // Let's initialize `justification_import` and `finality_proof_import`. if let Some(justification_import) = worker.justification_import.as_mut() { for (hash, number) in justification_import.on_start() { worker.result_sender.request_justification(&hash, number); } } - if let Some(finality_proof_import) = worker.finality_proof_import.as_mut() { for (hash, number) in finality_proof_import.on_start() { worker.result_sender.request_finality_proof(&hash, number); } } + // The future below has two possible states: + // + // - Currently importing many blocks, in which case `importing` is `Some` and contains a + // `Future`, and `block_import` is `None`. + // - Something else, in which case `block_import` is `Some` and `importing` is None. + // + let mut block_import = Some(block_import); + let mut importing = None; + let future = futures::future::poll_fn(move |cx| { loop { + // If the results sender is closed, that means that the import queue is shutting + // down and we should end this future. + if worker.result_sender.is_closed() { + return Poll::Ready(()) + } + + // If we are in the process of importing a bunch of block, let's resume this + // process before doing anything more. + if let Some(imp_fut) = importing.as_mut() { + match Future::poll(Pin::new(imp_fut), cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(bi) => { + block_import = Some(bi); + importing = None; + }, + } + } + + debug_assert!(importing.is_none()); + debug_assert!(block_import.is_some()); + + // Grab the next action request sent to the import queue. let msg = match Stream::poll_next(Pin::new(&mut port), cx) { Poll::Ready(Some(msg)) => msg, Poll::Ready(None) => return Poll::Ready(()), @@ -182,7 +213,10 @@ impl> BlockImportWorker { match msg { ToWorkerMsg::ImportBlocks(origin, blocks) => { - worker.import_a_batch_of_blocks(origin, blocks); + // On blocks import request, we merely *start* the process and store + // a `Future` into `importing`. + let bi = block_import.take().expect("block_import is always Some; qed"); + importing = Some(worker.import_a_batch_of_blocks(bi, origin, blocks)); }, ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => { worker.import_finality_proof(who, hash, number, proof); @@ -197,17 +231,24 @@ impl> BlockImportWorker { (future, sender) } - fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { - let result_sender = &self.result_sender; - let (imported, count, results) = import_many_blocks( - &mut *self.block_import, - origin, - blocks, - self.verifier.clone(), - || !result_sender.is_closed(), - ); + /// Returns a `Future` that imports the given blocks and sends the results on + /// `self.result_sender`. + /// + /// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is + /// yielded back in the output once the import is finished. + fn import_a_batch_of_blocks( + &mut self, + block_import: BoxBlockImport, + origin: BlockOrigin, + blocks: Vec> + ) -> impl Future> { + let mut result_sender = self.result_sender.clone(); - self.result_sender.blocks_processed(imported, count, results); + import_many_blocks(block_import, origin, blocks, self.verifier.clone(), self.delay_between_blocks) + .then(move |(imported, count, results, block_import)| { + result_sender.blocks_processed(imported, count, results); + future::ready(block_import) + }) } fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { @@ -257,20 +298,22 @@ impl> BlockImportWorker { /// Import several blocks at once, returning import result for each block. /// -/// The `keep_going` closure will be called regularly. If it returns false, then the function will -/// end prematurely. +/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is yielded +/// back in the output once the import is finished. +/// +/// The returned `Future` yields at every imported block, which makes the execution more +/// fine-grained and making it possible to interrupt the process. fn import_many_blocks>( - import_handle: &mut dyn BlockImport, + import_handle: BoxBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, verifier: Arc, - keep_going: impl Fn() -> bool, -) -> (usize, usize, Vec<( + delay_between_blocks: Duration, +) -> impl Future>, BlockImportError>, B::Hash, -)>) { +)>, BoxBlockImport)> { let count = blocks.len(); - let mut imported = 0; let blocks_range = match ( blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), @@ -283,38 +326,72 @@ fn import_many_blocks>( trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range); + let mut imported = 0; let mut results = vec![]; - let mut has_error = false; + let mut blocks = blocks.into_iter(); + let mut import_handle = Some(import_handle); + let mut waiting = None; // Blocks in the response/drain should be in ascending order. - for block in blocks { - if !keep_going() { - // Setting `has_error` to true cancels the rest of the import. - has_error = true; + + future::poll_fn(move |cx| { + // Handle the optional timer that makes us wait before the next import. + if let Some(waiting) = &mut waiting { + match Future::poll(Pin::new(waiting), cx) { + Poll::Ready(_) => {}, + Poll::Pending => return Poll::Pending, + } } + waiting = None; + + // Is there any block left to import? + let block = match blocks.next() { + Some(b) => b, + None => { + // No block left to import, success! + let import_handle = import_handle.take() + .expect("Future polled again after it has finished"); + let results = mem::replace(&mut results, Vec::new()); + return Poll::Ready((imported, count, results, import_handle)); + }, + }; + + // We extract the content of `import_handle` only when the future ends, therefore + // `import_handle` is always `Some` here. It is illegal to poll a `Future` again after it + // has ended. + let import_handle = import_handle.as_mut() + .expect("Future polled again after it has finished"); let block_number = block.header.as_ref().map(|h| h.number().clone()); let block_hash = block.hash; let import_result = if has_error { Err(BlockImportError::Cancelled) } else { + // The actual import. import_single_block( - import_handle, + &mut **import_handle, blocks_origin.clone(), block, verifier.clone(), ) }; - let was_ok = import_result.is_ok(); - if was_ok { + + if import_result.is_ok() { trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash); imported += 1; } else { has_error = true; } - results.push((import_result, block_hash)); - } - (imported, count, results) + results.push((import_result, block_hash)); + + // Notifies the current task again so that we re-execute this closure again for the next + // block. + if delay_between_blocks != Duration::new(0, 0) { + waiting = Some(Delay::new(delay_between_blocks)); + } + cx.waker().wake_by_ref(); + Poll::Pending + }) } diff --git a/substrate/core/consensus/common/src/import_queue/buffered_link.rs b/substrate/core/consensus/common/src/import_queue/buffered_link.rs index ffd08e690a..90921bd1ea 100644 --- a/substrate/core/consensus/common/src/import_queue/buffered_link.rs +++ b/substrate/core/consensus/common/src/import_queue/buffered_link.rs @@ -66,6 +66,14 @@ impl BufferedLinkSender { } } +impl Clone for BufferedLinkSender { + fn clone(&self) -> Self { + BufferedLinkSender { + tx: self.tx.clone(), + } + } +} + /// Internal buffered message. enum BlockImportWorkerMsg { BlocksProcessed(usize, usize, Vec<(Result>, BlockImportError>, B::Hash)>),