consensus: prioritize finality work over block import in queue (#7307)

* consensus: prioritize finality work over block import in queue

* consensus: add test for import queue task priority
This commit is contained in:
André Silva
2020-10-16 15:22:50 +01:00
committed by GitHub
parent 385c4ddf69
commit 463ba54d00
2 changed files with 281 additions and 57 deletions
@@ -36,8 +36,10 @@ use crate::{
/// Interface to a basic block import queue that is importing blocks sequentially in a separate
/// task, with plugable verification.
pub struct BasicQueue<B: BlockT, Transaction> {
/// Channel to send messages to the background task.
sender: TracingUnboundedSender<ToWorkerMsg<B>>,
/// Channel to send finality work messages to the background task.
finality_sender: TracingUnboundedSender<worker_messages::Finality<B>>,
/// Channel to send block import messages to the background task.
block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
/// Results coming from the worker task.
result_port: BufferedLinkReceiver<B>,
_phantom: PhantomData<Transaction>,
@@ -46,7 +48,8 @@ pub struct BasicQueue<B: BlockT, Transaction> {
impl<B: BlockT, Transaction> Drop for BasicQueue<B, Transaction> {
fn drop(&mut self) {
// Flush the queue and close the receiver to terminate the future.
self.sender.close_channel();
self.finality_sender.close_channel();
self.block_import_sender.close_channel();
self.result_port.close();
}
}
@@ -65,12 +68,16 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let metrics = prometheus_registry.and_then(|r|
let metrics = prometheus_registry.and_then(|r| {
Metrics::register(r)
.map_err(|err| { log::warn!("Failed to register Prometheus metrics: {}", err); })
.ok()
);
let (future, worker_sender) = BlockImportWorker::new(
.map_err(|err| {
log::warn!("Failed to register Prometheus metrics: {}", err);
})
.ok()
});
let (future, finality_sender, block_import_sender) = BlockImportWorker::new(
result_sender,
verifier,
block_import,
@@ -82,7 +89,8 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
spawner.spawn_blocking("basic-block-import-worker", future.boxed());
Self {
sender: worker_sender,
finality_sender,
block_import_sender,
result_port,
_phantom: PhantomData,
}
@@ -96,7 +104,9 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}
trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
let res = self.sender.unbounded_send(ToWorkerMsg::ImportBlocks(origin, blocks));
let res =
self.block_import_sender.unbounded_send(worker_messages::ImportBlocks(origin, blocks));
if res.is_err() {
log::error!(
target: "sync",
@@ -110,12 +120,12 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
who: Origin,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification
justification: Justification,
) {
let res = self.sender
.unbounded_send(
ToWorkerMsg::ImportJustification(who, hash, number, justification)
);
let res = self.finality_sender.unbounded_send(
worker_messages::Finality::ImportJustification(who, hash, number, justification),
);
if res.is_err() {
log::error!(
target: "sync",
@@ -132,10 +142,10 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
finality_proof: Vec<u8>,
) {
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
let res = self.sender
.unbounded_send(
ToWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof)
);
let res = self.finality_sender.unbounded_send(
worker_messages::Finality::ImportFinalityProof(who, hash, number, finality_proof),
);
if res.is_err() {
log::error!(
target: "sync",
@@ -151,12 +161,16 @@ impl<B: BlockT, Transaction: Send> ImportQueue<B> for BasicQueue<B, Transaction>
}
}
/// Message destinated to the background worker.
#[derive(Debug)]
enum ToWorkerMsg<B: BlockT> {
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Messages destinated to the background worker.
mod worker_messages {
use super::*;
pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);
pub enum Finality<B: BlockT> {
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
}
}
struct BlockImportWorker<B: BlockT, Transaction> {
@@ -176,8 +190,18 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
metrics: Option<Metrics>,
) -> (impl Future<Output = ()> + Send, TracingUnboundedSender<ToWorkerMsg<B>>) {
let (sender, mut port) = tracing_unbounded("mpsc_block_import_worker");
) -> (
impl Future<Output = ()> + Send,
TracingUnboundedSender<worker_messages::Finality<B>>,
TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) {
use worker_messages::*;
let (finality_sender, mut finality_port) =
tracing_unbounded("mpsc_import_queue_worker_finality");
let (block_import_sender, mut block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks");
let mut worker = BlockImportWorker {
result_sender,
@@ -206,6 +230,8 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
// `Future`, and `block_import` is `None`.
// - Something else, in which case `block_import` is `Some` and `importing` is None.
//
// Additionally, the task will prioritize processing of finality work messages over
// block import messages, hence why two distinct channels are used.
let mut block_import_verifier = Some((block_import, verifier));
let mut importing = None;
@@ -217,7 +243,30 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
return Poll::Ready(())
}
// If we are in the process of importing a bunch of block, let's resume this
// Grab the next finality action request sent to the import queue.
let finality_work = match Stream::poll_next(Pin::new(&mut finality_port), cx) {
Poll::Ready(Some(msg)) => Some(msg),
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => None,
};
match finality_work {
Some(Finality::ImportFinalityProof(who, hash, number, proof)) => {
let (_, verif) = block_import_verifier
.as_mut()
.expect("block_import_verifier is always Some; qed");
worker.import_finality_proof(verif, who, hash, number, proof);
continue;
}
Some(Finality::ImportJustification(who, hash, number, justification)) => {
worker.import_justification(who, hash, number, justification);
continue;
}
None => {}
}
// If we are in the process of importing a bunch of blocks, let's resume this
// process before doing anything more.
if let Some(imp_fut) = importing.as_mut() {
match Future::poll(Pin::new(imp_fut), cx) {
@@ -232,34 +281,25 @@ impl<B: BlockT, Transaction: Send> BlockImportWorker<B, Transaction> {
debug_assert!(importing.is_none());
debug_assert!(block_import_verifier.is_some());
// Grab the next action request sent to the import queue.
let msg = match Stream::poll_next(Pin::new(&mut port), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
};
// Grab the next block import request sent to the import queue.
let ImportBlocks(origin, blocks) =
match Stream::poll_next(Pin::new(&mut block_import_port), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
};
match msg {
ToWorkerMsg::ImportBlocks(origin, blocks) => {
// On blocks import request, we merely *start* the process and store
// a `Future` into `importing`.
let (bi, verif) = block_import_verifier.take()
.expect("block_import_verifier is always Some; qed");
importing = Some(worker.import_batch(bi, verif, origin, blocks));
},
ToWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
let (_, verif) = block_import_verifier.as_mut()
.expect("block_import_verifier is always Some; qed");
worker.import_finality_proof(verif, who, hash, number, proof);
},
ToWorkerMsg::ImportJustification(who, hash, number, justification) => {
worker.import_justification(who, hash, number, justification);
}
}
// On blocks import request, we merely *start* the process and store
// a `Future` into `importing`.
let (block_import, verifier) = block_import_verifier
.take()
.expect("block_import_verifier is always Some; qed");
importing = Some(worker.import_batch(block_import, verifier, origin, blocks));
}
});
(future, sender)
(future, finality_sender, block_import_sender)
}
/// Returns a `Future` that imports the given blocks and sends the results on
@@ -363,12 +403,11 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
Output = (
usize,
usize,
Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash,)>,
Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>,
BoxBlockImport<B, Transaction>,
V
)
>
{
V,
),
> {
let count = blocks.len();
let blocks_range = match (
@@ -461,3 +500,187 @@ fn import_many_blocks<B: BlockT, V: Verifier<B>, Transaction>(
Poll::Pending
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
import_queue::{CacheKeyId, Verifier},
BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
};
use futures::{executor::block_on, Future};
use sp_test_primitives::{Block, BlockNumber, Extrinsic, Hash, Header};
use std::collections::HashMap;
impl Verifier<Block> for () {
fn verify(
&mut self,
origin: BlockOrigin,
header: Header,
_justification: Option<Justification>,
_body: Option<Vec<Extrinsic>>,
) -> Result<(BlockImportParams<Block, ()>, Option<Vec<(CacheKeyId, Vec<u8>)>>), String> {
Ok((BlockImportParams::new(origin, header), None))
}
}
impl BlockImport<Block> for () {
type Error = crate::Error;
type Transaction = Extrinsic;
fn check_block(
&mut self,
_block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
Ok(ImportResult::imported(false))
}
fn import_block(
&mut self,
_block: BlockImportParams<Block, Self::Transaction>,
_cache: HashMap<CacheKeyId, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
Ok(ImportResult::imported(true))
}
}
impl JustificationImport<Block> for () {
type Error = crate::Error;
fn import_justification(
&mut self,
_hash: Hash,
_number: BlockNumber,
_justification: Justification,
) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Debug, PartialEq)]
enum Event {
JustificationImported(Hash),
BlockImported(Hash),
}
#[derive(Default)]
struct TestLink {
events: Vec<Event>,
}
impl Link<Block> for TestLink {
fn blocks_processed(
&mut self,
_imported: usize,
_count: usize,
results: Vec<(Result<BlockImportResult<BlockNumber>, BlockImportError>, Hash)>,
) {
if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
self.events.push(Event::BlockImported(hash));
}
}
fn justification_imported(
&mut self,
_who: Origin,
hash: &Hash,
_number: BlockNumber,
_success: bool,
) {
self.events.push(Event::JustificationImported(hash.clone()))
}
}
#[test]
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link();
let (mut worker, mut finality_sender, mut block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None, None);
let mut import_block = |n| {
let header = Header {
parent_hash: Hash::random(),
number: n,
extrinsics_root: Hash::random(),
state_root: Default::default(),
digest: Default::default(),
};
let hash = header.hash();
block_on(block_import_sender.send(worker_messages::ImportBlocks(
BlockOrigin::Own,
vec![IncomingBlock {
hash,
header: Some(header),
body: None,
justification: None,
origin: None,
allow_missing_state: false,
import_existing: false,
}],
)))
.unwrap();
hash
};
let mut import_justification = || {
let hash = Hash::random();
block_on(finality_sender.send(worker_messages::Finality::ImportJustification(
libp2p::PeerId::random(),
hash,
1,
Vec::new(),
)))
.unwrap();
hash
};
let mut link = TestLink::default();
// we send a bunch of tasks to the worker
let block1 = import_block(1);
let block2 = import_block(2);
let block3 = import_block(3);
let justification1 = import_justification();
let justification2 = import_justification();
let block4 = import_block(4);
let block5 = import_block(5);
let block6 = import_block(6);
let justification3 = import_justification();
// we poll the worker until we have processed 9 events
block_on(futures::future::poll_fn(|cx| {
while link.events.len() < 9 {
match Future::poll(Pin::new(&mut worker), cx) {
Poll::Pending => {}
Poll::Ready(()) => panic!("import queue worker should not conclude."),
}
result_port.poll_actions(cx, &mut link).unwrap();
}
Poll::Ready(())
}));
// all justification tasks must be done before any block import work
assert_eq!(
link.events,
vec![
Event::JustificationImported(justification1),
Event::JustificationImported(justification2),
Event::JustificationImported(justification3),
Event::BlockImported(block1),
Event::BlockImported(block2),
Event::BlockImported(block3),
Event::BlockImported(block4),
Event::BlockImported(block5),
Event::BlockImported(block6),
]
);
}
}