Import queue steps (#3159)

* Switch consensus-common to new futures

* Fix tests

* More tests fixing

* Pass the block_import by value when importing

* Rewrite the multi-blocks import as a future

* Add some comments

* Add a delay between each block

* Fix Babe tests

* Reverse white space change
This commit is contained in:
Pierre Krieger
2019-07-26 02:42:03 +02:00
committed by Gavin Wood
parent d98f3c6023
commit 5220ccbf20
2 changed files with 120 additions and 35 deletions
@@ -14,12 +14,12 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use std::{pin::Pin, sync::Arc};
use std::{mem, pin::Pin, sync::Arc, time::Duration};
use futures::{prelude::*, channel::mpsc, task::SpawnExt as _, task::Context, task::Poll};
use futures_timer::Delay;
use runtime_primitives::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
use crate::error::Error as ConsensusError;
use crate::block_import::{BlockImport, BlockOrigin};
use crate::block_import::BlockOrigin;
use crate::import_queue::{
BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport,
BoxJustificationImport, ImportQueue, Link, Origin,
@@ -136,10 +136,10 @@ enum ToWorkerMsg<B: BlockT> {
struct BlockImportWorker<B: BlockT, V: Verifier<B>> {
result_sender: BufferedLinkSender<B>,
block_import: BoxBlockImport<B>,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
verifier: Arc<V>,
delay_between_blocks: Duration,
}
impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
@@ -156,24 +156,55 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
result_sender,
verifier,
justification_import,
block_import,
finality_proof_import,
delay_between_blocks: Duration::new(0, 0),
};
// Let's initialize `justification_import` and `finality_proof_import`.
if let Some(justification_import) = worker.justification_import.as_mut() {
for (hash, number) in justification_import.on_start() {
worker.result_sender.request_justification(&hash, number);
}
}
if let Some(finality_proof_import) = worker.finality_proof_import.as_mut() {
for (hash, number) in finality_proof_import.on_start() {
worker.result_sender.request_finality_proof(&hash, number);
}
}
// The future below has two possible states:
//
// - Currently importing many blocks, in which case `importing` is `Some` and contains a
// `Future`, and `block_import` is `None`.
// - Something else, in which case `block_import` is `Some` and `importing` is None.
//
let mut block_import = Some(block_import);
let mut importing = None;
let future = futures::future::poll_fn(move |cx| {
loop {
// If the results sender is closed, that means that the import queue is shutting
// down and we should end this future.
if worker.result_sender.is_closed() {
return Poll::Ready(())
}
// If we are in the process of importing a bunch of block, let's resume this
// process before doing anything more.
if let Some(imp_fut) = importing.as_mut() {
match Future::poll(Pin::new(imp_fut), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(bi) => {
block_import = Some(bi);
importing = None;
},
}
}
debug_assert!(importing.is_none());
debug_assert!(block_import.is_some());
// Grab the next action request sent to the import queue.
let msg = match Stream::poll_next(Pin::new(&mut port), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(()),
@@ -182,7 +213,10 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
match msg {
ToWorkerMsg::ImportBlocks(origin, blocks) => {
worker.import_a_batch_of_blocks(origin, blocks);
// On blocks import request, we merely *start* the process and store
// a `Future` into `importing`.
let bi = block_import.take().expect("block_import is always Some; qed");
importing = Some(worker.import_a_batch_of_blocks(bi, origin, blocks));
},
ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
worker.import_finality_proof(who, hash, number, proof);
@@ -197,17 +231,24 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
(future, sender)
}
fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
let result_sender = &self.result_sender;
let (imported, count, results) = import_many_blocks(
&mut *self.block_import,
origin,
blocks,
self.verifier.clone(),
|| !result_sender.is_closed(),
);
/// Returns a `Future` that imports the given blocks and sends the results on
/// `self.result_sender`.
///
/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is
/// yielded back in the output once the import is finished.
fn import_a_batch_of_blocks(
&mut self,
block_import: BoxBlockImport<B>,
origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>
) -> impl Future<Output = BoxBlockImport<B>> {
let mut result_sender = self.result_sender.clone();
self.result_sender.blocks_processed(imported, count, results);
import_many_blocks(block_import, origin, blocks, self.verifier.clone(), self.delay_between_blocks)
.then(move |(imported, count, results, block_import)| {
result_sender.blocks_processed(imported, count, results);
future::ready(block_import)
})
}
fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
@@ -257,20 +298,22 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
/// Import several blocks at once, returning import result for each block.
///
/// The `keep_going` closure will be called regularly. If it returns false, then the function will
/// end prematurely.
/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is yielded
/// back in the output once the import is finished.
///
/// The returned `Future` yields at every imported block, which makes the execution more
/// fine-grained and making it possible to interrupt the process.
fn import_many_blocks<B: BlockT, V: Verifier<B>>(
import_handle: &mut dyn BlockImport<B, Error = ConsensusError>,
import_handle: BoxBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: Arc<V>,
keep_going: impl Fn() -> bool,
) -> (usize, usize, Vec<(
delay_between_blocks: Duration,
) -> impl Future<Output = (usize, usize, Vec<(
Result<BlockImportResult<NumberFor<B>>, BlockImportError>,
B::Hash,
)>) {
)>, BoxBlockImport<B>)> {
let count = blocks.len();
let mut imported = 0;
let blocks_range = match (
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
@@ -283,38 +326,72 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>>(
trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range);
let mut imported = 0;
let mut results = vec![];
let mut has_error = false;
let mut blocks = blocks.into_iter();
let mut import_handle = Some(import_handle);
let mut waiting = None;
// Blocks in the response/drain should be in ascending order.
for block in blocks {
if !keep_going() {
// Setting `has_error` to true cancels the rest of the import.
has_error = true;
future::poll_fn(move |cx| {
// Handle the optional timer that makes us wait before the next import.
if let Some(waiting) = &mut waiting {
match Future::poll(Pin::new(waiting), cx) {
Poll::Ready(_) => {},
Poll::Pending => return Poll::Pending,
}
}
waiting = None;
// Is there any block left to import?
let block = match blocks.next() {
Some(b) => b,
None => {
// No block left to import, success!
let import_handle = import_handle.take()
.expect("Future polled again after it has finished");
let results = mem::replace(&mut results, Vec::new());
return Poll::Ready((imported, count, results, import_handle));
},
};
// We extract the content of `import_handle` only when the future ends, therefore
// `import_handle` is always `Some` here. It is illegal to poll a `Future` again after it
// has ended.
let import_handle = import_handle.as_mut()
.expect("Future polled again after it has finished");
let block_number = block.header.as_ref().map(|h| h.number().clone());
let block_hash = block.hash;
let import_result = if has_error {
Err(BlockImportError::Cancelled)
} else {
// The actual import.
import_single_block(
import_handle,
&mut **import_handle,
blocks_origin.clone(),
block,
verifier.clone(),
)
};
let was_ok = import_result.is_ok();
if was_ok {
if import_result.is_ok() {
trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash);
imported += 1;
} else {
has_error = true;
}
results.push((import_result, block_hash));
}
(imported, count, results)
results.push((import_result, block_hash));
// Notifies the current task again so that we re-execute this closure again for the next
// block.
if delay_between_blocks != Duration::new(0, 0) {
waiting = Some(Delay::new(delay_between_blocks));
}
cx.waker().wake_by_ref();
Poll::Pending
})
}
@@ -66,6 +66,14 @@ impl<B: BlockT> BufferedLinkSender<B> {
}
}
impl<B: BlockT> Clone for BufferedLinkSender<B> {
fn clone(&self) -> Self {
BufferedLinkSender {
tx: self.tx.clone(),
}
}
}
/// Internal buffered message.
enum BlockImportWorkerMsg<B: BlockT> {
BlocksProcessed(usize, usize, Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>),