Rewrite the async code in BasicQueue (#7988)

* Rewrite the async code in `BasicQueue`

This is some smaller change to rewrite the async code in `BasicQueue`. I
require this for some other pr I'm working on ;)

* Update primitives/consensus/common/src/import_queue/basic_queue.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update primitives/consensus/common/src/import_queue/basic_queue.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Update primitives/consensus/common/src/import_queue/basic_queue.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Hmm :D

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
This commit is contained in:
Bastian Köcher
2021-01-26 16:38:57 +01:00
committed by GitHub
parent b1888395e8
commit e21a61eac8
@@ -15,11 +15,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{mem, pin::Pin, time::Duration, marker::PhantomData};
use std::{pin::Pin, time::Duration, marker::PhantomData};
use futures::{prelude::*, task::Context, task::Poll};
use futures_timer::Delay;
use sp_runtime::{Justification, traits::{Block as BlockT, Header as HeaderT, NumberFor}};
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded};
use sp_utils::mpsc::{TracingUnboundedSender, tracing_unbounded, TracingUnboundedReceiver};
use prometheus_endpoint::Registry;
use crate::{
@@ -146,16 +146,48 @@ mod worker_messages {
pub struct ImportJustification<B: BlockT>(pub Origin, pub B::Hash, pub NumberFor<B>, pub Justification);
}
struct BlockImportWorker<B: BlockT, Transaction> {
result_sender: BufferedLinkSender<B>,
justification_import: Option<BoxJustificationImport<B>>,
delay_between_blocks: Duration,
/// The process of importing blocks.
///
/// This polls the `block_import_receiver` for new blocks to import and than awaits on importing these blocks.
/// After each block is imported, this async function yields once to give other futures the possibility
/// to be run.
///
/// Returns when `block_import` ended.
async fn block_import_process<B: BlockT, Transaction: Send>(
mut block_import: BoxBlockImport<B, Transaction>,
mut verifier: impl Verifier<B>,
mut result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
_phantom: PhantomData<Transaction>,
delay_between_blocks: Duration,
) {
loop {
let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await {
Some(blocks) => blocks,
None => return,
};
let res = import_many_blocks(
&mut block_import,
origin,
blocks,
&mut verifier,
delay_between_blocks,
metrics.clone(),
).await;
result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
}
impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
fn new<V: 'static + Verifier<B>>(
struct BlockImportWorker<B: BlockT> {
result_sender: BufferedLinkSender<B>,
justification_import: Option<BoxJustificationImport<B>>,
metrics: Option<Metrics>,
}
impl<B: BlockT> BlockImportWorker<B> {
fn new<V: 'static + Verifier<B>, Transaction: Send>(
result_sender: BufferedLinkSender<B>,
verifier: V,
block_import: BoxBlockImport<B, Transaction>,
@@ -171,15 +203,13 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
let (justification_sender, mut justification_port) =
tracing_unbounded("mpsc_import_queue_worker_justification");
let (block_import_sender, mut block_import_port) =
let (block_import_sender, block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks");
let mut worker = BlockImportWorker {
result_sender,
justification_import,
delay_between_blocks: Duration::new(0, 0),
metrics,
_phantom: PhantomData,
};
// Let's initialize `justification_import`
@@ -189,93 +219,47 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
}
}
// The future below has two possible states:
//
// - Currently importing many blocks, in which case `importing` is `Some` and contains a
// `Future`, and `block_import` is `None`.
// - Something else, in which case `block_import` is `Some` and `importing` is None.
//
// Additionally, the task will prioritize processing of justification import messages over
// block import messages, hence why two distinct channels are used.
let mut block_import_verifier = Some((block_import, verifier));
let mut importing = None;
let delay_between_blocks = Duration::default();
let future = async move {
let block_import_process = block_import_process(
block_import,
verifier,
worker.result_sender.clone(),
block_import_port,
worker.metrics.clone(),
delay_between_blocks,
);
futures::pin_mut!(block_import_process);
let future = futures::future::poll_fn(move |cx| {
loop {
// If the results sender is closed, that means that the import queue is shutting
// down and we should end this future.
if worker.result_sender.is_closed() {
return Poll::Ready(())
return;
}
// Grab the next justification import request sent to the import queue.
match Stream::poll_next(Pin::new(&mut justification_port), cx) {
Poll::Ready(Some(ImportJustification(who, hash, number, justification))) => {
worker.import_justification(who, hash, number, justification);
continue;
},
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => {},
};
// If we are in the process of importing a bunch of blocks, let's resume this
// process before doing anything more.
if let Some(imp_fut) = importing.as_mut() {
match Future::poll(Pin::new(imp_fut), cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready((bi, verif)) => {
block_import_verifier = Some((bi, verif));
importing = None;
},
// Make sure to first process all justifications
while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
match justification {
Some(ImportJustification(who, hash, number, justification)) =>
worker.import_justification(who, hash, number, justification),
None => return,
}
}
debug_assert!(importing.is_none());
debug_assert!(block_import_verifier.is_some());
if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
return;
}
// Grab the next block import request sent to the import queue.
let ImportBlocks(origin, blocks) =
match Stream::poll_next(Pin::new(&mut block_import_port), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
};
// On blocks import request, we merely *start* the process and store
// a `Future` into `importing`.
let (block_import, verifier) = block_import_verifier
.take()
.expect("block_import_verifier is always Some; qed");
importing = Some(worker.import_batch(block_import, verifier, origin, blocks));
// All futures that we polled are now pending.
futures::pending!()
}
});
};
(future, justification_sender, block_import_sender)
}
/// Returns a `Future` that imports the given blocks and sends the results on
/// `self.result_sender`.
///
/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is
/// yielded back in the output once the import is finished.
fn import_batch<V: 'static + Verifier<B>>(
&mut self,
block_import: BoxBlockImport<B, Transaction>,
verifier: V,
origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
) -> impl Future<Output = (BoxBlockImport<B, Transaction>, V)> {
let mut result_sender = self.result_sender.clone();
let metrics = self.metrics.clone();
import_many_blocks(block_import, origin, blocks, verifier, self.delay_between_blocks, metrics)
.then(move |(imported, count, results, block_import, verifier)| {
result_sender.blocks_processed(imported, count, results);
future::ready((block_import, verifier))
})
}
fn import_justification(
&mut self,
who: Origin,
@@ -307,29 +291,27 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
}
}
/// Result of [`import_many_blocks`].
struct ImportManyBlocksResult<B: BlockT> {
/// The number of blocks imported successfully.
imported: usize,
/// The total number of blocks processed.
block_count: usize,
/// The import results for each block.
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>,
}
/// Import several blocks at once, returning import result for each block.
///
/// For lifetime reasons, the `BlockImport` implementation must be passed by value, and is yielded
/// back in the output once the import is finished.
///
/// The returned `Future` yields at every imported block, which makes the execution more
/// fine-grained and making it possible to interrupt the process.
fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
import_handle: BoxBlockImport<B, Transaction>,
/// This will yield after each imported block once, to ensure that other futures can be called as well.
async fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
import_handle: &mut BoxBlockImport<B, Transaction>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: V,
verifier: &mut V,
delay_between_blocks: Duration,
metrics: Option<Metrics>,
) -> impl Future<
Output = (
usize,
usize,
Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>,
BoxBlockImport<B, Transaction>,
V,
),
> {
) -> ImportManyBlocksResult<B> {
let count = blocks.len();
let blocks_range = match (
@@ -347,44 +329,18 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
let mut results = vec![];
let mut has_error = false;
let mut blocks = blocks.into_iter();
let mut import_handle = Some(import_handle);
let mut waiting = None;
let mut verifier = Some(verifier);
// Blocks in the response/drain should be in ascending order.
future::poll_fn(move |cx| {
// Handle the optional timer that makes us wait before the next import.
if let Some(waiting) = &mut waiting {
match Future::poll(Pin::new(waiting), cx) {
Poll::Ready(_) => {},
Poll::Pending => return Poll::Pending,
}
}
waiting = None;
loop {
// Is there any block left to import?
let block = match blocks.next() {
Some(b) => b,
None => {
// No block left to import, success!
let import_handle = import_handle.take()
.expect("Future polled again after it has finished (import handle is None)");
let verifier = verifier.take()
.expect("Future polled again after it has finished (verifier handle is None)");
let results = mem::replace(&mut results, Vec::new());
return Poll::Ready((imported, count, results, import_handle, verifier));
return ImportManyBlocksResult { block_count: count, imported, results }
},
};
// We extract the content of `import_handle` and `verifier` only when the future ends,
// therefore `import_handle` and `verifier` are always `Some` here. It is illegal to poll
// a `Future` again after it has ended.
let import_handle = import_handle.as_mut()
.expect("Future polled again after it has finished (import handle is None)");
let verifier = verifier.as_mut()
.expect("Future polled again after it has finished (verifier handle is None)");
let block_number = block.header.as_ref().map(|h| h.number().clone());
let block_hash = block.hash;
let import_result = if has_error {
@@ -392,7 +348,7 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
} else {
// The actual import.
import_single_block_metered(
&mut **import_handle,
import_handle,
blocks_origin.clone(),
block,
verifier,
@@ -405,7 +361,12 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
}
if import_result.is_ok() {
trace!(target: "sync", "Block imported successfully {:?} ({})", block_number, block_hash);
trace!(
target: "sync",
"Block imported successfully {:?} ({})",
block_number,
block_hash,
);
imported += 1;
} else {
has_error = true;
@@ -413,14 +374,40 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
results.push((import_result, block_hash));
// Notifies the current task again so that we re-execute this closure again for the next
// block.
if delay_between_blocks != Duration::new(0, 0) {
waiting = Some(Delay::new(delay_between_blocks));
if delay_between_blocks != Duration::default() && !has_error {
Delay::new(delay_between_blocks).await;
} else {
Yield::new().await
}
cx.waker().wake_by_ref();
Poll::Pending
})
}
}
/// A future that will always `yield` on the first call of `poll` but schedules the current task for
/// re-execution.
///
/// This is done by getting the waker and calling `wake_by_ref` followed by returning `Pending`.
/// The next time the `poll` is called, it will return `Ready`.
struct Yield(bool);
impl Yield {
fn new() -> Self {
Self(false)
}
}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}
#[cfg(test)]
@@ -517,8 +504,9 @@ mod tests {
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link();
let (mut worker, mut finality_sender, mut block_import_sender) =
let (worker, mut finality_sender, mut block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
futures::pin_mut!(worker);
let mut import_block = |n| {
let header = Header {