Rewrite the BasiQueue using channels (#1327)

* use channels to implement basic import queue

* async justification import

* better conditional for is_done in tests

* reword the test for presence of link

* fix conditional

* trace instead of panic when no link present

* reword expectations when sending to importers

* fix

* debug justification import error

* update expectations

* use NumberFor

* nits

* add general description

* move error handling into closure
This commit is contained in:
Gregory Terzian
2019-02-17 17:13:14 +08:00
committed by Gav Wood
parent 797de27d2b
commit 72bb8ef4c5
16 changed files with 614 additions and 575 deletions
@@ -25,19 +25,21 @@
//! instantiated simply.
use crate::block_import::{ImportBlock, BlockImport, JustificationImport, ImportResult, BlockOrigin};
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use parking_lot::{Condvar, Mutex, RwLock};
use log::{trace, debug};
use crossbeam_channel::{self as channel, Receiver, Sender};
use std::collections::HashSet;
use std::sync::Arc;
use std::thread;
use runtime_primitives::traits::{
AuthorityIdFor, Block as BlockT, Header as HeaderT, NumberFor, Zero,
};
use runtime_primitives::Justification;
use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero, AuthorityIdFor};
use crate::error::Error as ConsensusError;
/// Shared block import struct used by the queue.
pub type SharedBlockImport<B> = Arc<dyn BlockImport<B, Error=ConsensusError> + Send + Sync>;
pub type SharedBlockImport<B> = Arc<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
/// Shared justification import struct used by the queue.
pub type SharedJustificationImport<B> = Arc<dyn JustificationImport<B, Error=ConsensusError> + Send + Sync>;
@@ -70,20 +72,17 @@ pub trait Verifier<B: BlockT>: Send + Sync + Sized {
origin: BlockOrigin,
header: B::Header,
justification: Option<Justification>,
body: Option<Vec<B::Extrinsic>>
body: Option<Vec<B::Extrinsic>>,
) -> Result<(ImportBlock<B>, Option<Vec<AuthorityIdFor<B>>>), String>;
}
/// Blocks import queue API.
pub trait ImportQueue<B: BlockT>: Send + Sync {
pub trait ImportQueue<B: BlockT>: Send + Sync + ImportQueueClone<B> {
/// Start background work for the queue as necessary.
///
/// This is called automatically by the network service when synchronization
/// begins.
fn start<L>(&self, _link: L) -> Result<(), std::io::Error> where
Self: Sized,
L: 'static + Link<B>,
{
fn start(&self, _link: Box<Link<B>>) -> Result<(), std::io::Error> {
Ok(())
}
/// Clear the queue when sync is restarting.
@@ -97,199 +96,433 @@ pub trait ImportQueue<B: BlockT>: Send + Sync {
/// Import bunch of blocks.
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
/// Import a block justification.
fn import_justification(&self, hash: B::Hash, number: NumberFor<B>, justification: Justification) -> bool;
fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification);
}
pub trait ImportQueueClone<B: BlockT> {
fn clone_box(&self) -> Box<ImportQueue<B>>;
}
impl<B: BlockT> Clone for Box<ImportQueue<B>> {
fn clone(&self) -> Box<ImportQueue<B>> {
self.clone_box()
}
}
/// Import queue status. It isn't completely accurate.
#[derive(Debug)]
pub struct ImportQueueStatus<B: BlockT> {
/// Number of blocks that are currently in the queue.
pub importing_count: usize,
/// The number of the best block that was ever in the queue since start/last failure.
pub best_importing_number: <<B as BlockT>::Header as HeaderT>::Number,
pub best_importing_number: NumberFor<B>,
}
/// Basic block import queue that is importing blocks sequentially in a separate thread,
/// Interface to a basic block import queue that is importing blocks sequentially in a separate thread,
/// with pluggable verification.
pub struct BasicQueue<B: BlockT, V: 'static + Verifier<B>> {
handle: Mutex<Option<::std::thread::JoinHandle<()>>>,
data: Arc<AsyncImportQueueData<B>>,
verifier: Arc<V>,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
#[derive(Clone)]
pub struct BasicQueue<B: BlockT> {
sender: Sender<BlockImportMsg<B>>,
}
/// Locks order: queue, queue_blocks, best_importing_number
pub struct AsyncImportQueueData<B: BlockT> {
signal: Condvar,
queue: Mutex<VecDeque<(BlockOrigin, Vec<IncomingBlock<B>>)>>,
queue_blocks: RwLock<HashSet<B::Hash>>,
best_importing_number: RwLock<<<B as BlockT>::Header as HeaderT>::Number>,
is_stopping: AtomicBool,
impl<B: BlockT> ImportQueueClone<B> for BasicQueue<B> {
fn clone_box(&self) -> Box<ImportQueue<B>> {
Box::new(self.clone())
}
}
impl<B: BlockT, V: Verifier<B>> BasicQueue<B, V> {
/// Instantiate a new basic queue, with given verifier and justification import.
pub fn new(verifier: Arc<V>, block_import: SharedBlockImport<B>, justification_import: Option<SharedJustificationImport<B>>) -> Self {
/// "BasicQueue" is a wrapper around a channel sender to the "BlockImporter".
/// "BasicQueue" itself does not keep any state or do any importing work, and can therefore be send to other threads.
///
/// "BasiqQueue" implements "ImportQueue" by sending messages to the "BlockImporter", which runs in it's own thread.
///
/// The "BlockImporter" is responsible for handling incoming requests from the "BasicQueue",
/// some of these requests are handled by the "BlockImporter" itself, such as "is_importing" or "status",
/// and justifications are also imported by the "BlockImporter".
///
/// The "import block" work will be offloaded to a single "BlockImportWorker", running in another thread.
/// Offloading the work is done via a channel,
/// ensuring blocks in this implementation are imported sequentially and in order(as received by the "BlockImporter")
///
/// As long as the "BasicQueue" is not dropped, the "BlockImporter" will keep running.
/// The "BlockImporter" owns a sender to the "BlockImportWorker", ensuring that the worker is kept alive until that sender is dropped.
impl<B: BlockT> BasicQueue<B> {
/// Instantiate a new basic queue, with given verifier.
pub fn new<V: 'static + Verifier<B>>(
verifier: Arc<V>,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>
) -> Self {
let (result_sender, result_port) = channel::unbounded();
let worker_sender = BlockImportWorker::new(result_sender, verifier, block_import);
let importer_sender = BlockImporter::new(result_port, worker_sender, justification_import);
Self {
handle: Mutex::new(None),
data: Arc::new(AsyncImportQueueData::new()),
verifier,
block_import,
justification_import,
sender: importer_sender,
}
}
}
impl<B: BlockT> AsyncImportQueueData<B> {
/// Instantiate a new async import queue data.
pub fn new() -> Self {
Self {
signal: Default::default(),
queue: Mutex::new(VecDeque::new()),
queue_blocks: RwLock::new(HashSet::new()),
best_importing_number: RwLock::new(Zero::zero()),
is_stopping: Default::default(),
}
}
// Signals to stop importing new blocks.
pub fn stop(&self) {
self.is_stopping.store(true, Ordering::SeqCst);
}
}
impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> {
fn start<L: 'static + Link<B>>(
&self,
link: L,
) -> Result<(), std::io::Error> {
debug_assert!(self.handle.lock().is_none());
let qdata = self.data.clone();
let verifier = self.verifier.clone();
let block_import = self.block_import.clone();
let justification_import = self.justification_import.clone();
*self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || {
if let Some(justification_import) = justification_import.as_ref() {
justification_import.on_start(&link);
}
import_thread(block_import, link, qdata, verifier)
})?);
impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
fn start(&self, link: Box<Link<B>>) -> Result<(), std::io::Error> {
let _ = self
.sender
.send(BlockImportMsg::Start(link))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
Ok(())
}
fn clear(&self) {
let mut queue = self.data.queue.lock();
let mut queue_blocks = self.data.queue_blocks.write();
let mut best_importing_number = self.data.best_importing_number.write();
queue_blocks.clear();
queue.clear();
*best_importing_number = Zero::zero();
let _ = self
.sender
.send(BlockImportMsg::Clear)
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
}
fn stop(&self) {
self.clear();
if let Some(handle) = self.handle.lock().take() {
{
// Perform storing the stop flag and signalling under a single lock.
let _queue_lock = self.data.queue.lock();
self.data.stop();
self.data.signal.notify_one();
}
let _ = handle.join();
}
let _ = self
.sender
.send(BlockImportMsg::Stop)
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
}
fn status(&self) -> ImportQueueStatus<B> {
ImportQueueStatus {
importing_count: self.data.queue_blocks.read().len(),
best_importing_number: *self.data.best_importing_number.read(),
}
let (sender, port) = channel::unbounded();
let _ = self
.sender
.send(BlockImportMsg::Status(sender))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed")
}
fn is_importing(&self, hash: &B::Hash) -> bool {
self.data.queue_blocks.read().contains(hash)
let (sender, port) = channel::unbounded();
let _ = self
.sender
.send(BlockImportMsg::IsImporting(hash.clone(), sender))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed")
}
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
if blocks.is_empty() {
return;
}
trace!(target:"sync", "Scheduling {} blocks for import", blocks.len());
let mut queue = self.data.queue.lock();
let mut queue_blocks = self.data.queue_blocks.write();
let mut best_importing_number = self.data.best_importing_number.write();
let new_best_importing_number = blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number().clone())).unwrap_or_else(|| Zero::zero());
queue_blocks.extend(blocks.iter().map(|b| b.hash.clone()));
if new_best_importing_number > *best_importing_number {
*best_importing_number = new_best_importing_number;
}
queue.push_back((origin, blocks));
self.data.signal.notify_one();
let _ = self
.sender
.send(BlockImportMsg::ImportBlocks(origin, blocks))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
}
fn import_justification(&self, hash: B::Hash, number: NumberFor<B>, justification: Justification) -> bool {
self.justification_import.as_ref().map(|justification_import| {
justification_import.import_justification(hash, number, justification).is_ok()
}).unwrap_or(false)
fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification) {
let _ = self
.sender
.send(BlockImportMsg::ImportJustification(who, hash, number, justification))
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
}
}
impl<B: BlockT, V: 'static + Verifier<B>> Drop for BasicQueue<B, V> {
fn drop(&mut self) {
self.stop();
}
pub enum BlockImportMsg<B: BlockT> {
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
Clear,
Status(Sender<ImportQueueStatus<B>>),
IsImporting(B::Hash, Sender<bool>),
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
Start(Box<Link<B>>),
Stop,
}
/// Blocks import thread.
fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>(
block_import: SharedBlockImport<B>,
link: L,
qdata: Arc<AsyncImportQueueData<B>>,
verifier: Arc<V>
) {
trace!(target: "sync", "Starting import thread");
loop {
let new_blocks = {
let mut queue_lock = qdata.queue.lock();
pub enum BlockImportWorkerMsg<B: BlockT> {
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
Imported(
Vec<(
Result<BlockImportResult<NumberFor<B>>, BlockImportError>,
B::Hash,
)>,
),
}
// We are holding the same lock that `stop` takes so here we either see that stop flag
// is active or wait for the signal. The latter one unlocks the mutex and this gives a chance
// to `stop` to generate the signal.
if qdata.is_stopping.load(Ordering::SeqCst) {
break;
}
if queue_lock.is_empty() {
qdata.signal.wait(&mut queue_lock);
}
enum ImportMsgType<B: BlockT> {
FromWorker(BlockImportWorkerMsg<B>),
FromNetwork(BlockImportMsg<B>),
}
match queue_lock.pop_front() {
Some(new_blocks) => new_blocks,
None => break,
struct BlockImporter<B: BlockT> {
port: Receiver<BlockImportMsg<B>>,
result_port: Receiver<BlockImportWorkerMsg<B>>,
worker_sender: Sender<BlockImportWorkerMsg<B>>,
queue_blocks: HashSet<B::Hash>,
best_importing_number: NumberFor<B>,
link: Option<Box<dyn Link<B>>>,
justification_import: Option<SharedJustificationImport<B>>,
}
impl<B: BlockT> BlockImporter<B> {
fn new(
result_port: Receiver<BlockImportWorkerMsg<B>>,
worker_sender: Sender<BlockImportWorkerMsg<B>>,
justification_import: Option<SharedJustificationImport<B>>,
) -> Sender<BlockImportMsg<B>> {
let (sender, port) = channel::unbounded();
let _ = thread::Builder::new()
.name("ImportQueue".into())
.spawn(move || {
let mut importer = BlockImporter {
port,
result_port,
worker_sender,
queue_blocks: HashSet::new(),
best_importing_number: Zero::zero(),
link: None,
justification_import,
};
while importer.run() {
// Importing until all senders have been dropped...
}
})
.expect("ImportQueue thread spawning failed");
sender
}
fn run(&mut self) -> bool {
let msg = select! {
recv(self.port) -> msg => {
match msg {
// Our sender has been dropped, quitting.
Err(_) => return false,
Ok(msg) => ImportMsgType::FromNetwork(msg)
}
},
recv(self.result_port) -> msg => {
match msg {
Err(_) => unreachable!("1. We hold a sender to the Worker, 2. it should not quit until that sender is dropped; qed"),
Ok(msg) => ImportMsgType::FromWorker(msg),
}
}
};
let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.hash.clone()).collect();
if !import_many_blocks(
&*block_import,
&link,
Some(&*qdata),
new_blocks,
verifier.clone(),
) {
break;
}
let mut queue_blocks = qdata.queue_blocks.write();
for blocks_hash in blocks_hashes {
queue_blocks.remove(&blocks_hash);
match msg {
ImportMsgType::FromNetwork(msg) => self.handle_network_msg(msg),
ImportMsgType::FromWorker(msg) => self.handle_worker_msg(msg),
}
}
trace!(target: "sync", "Stopping import thread");
fn handle_network_msg(&mut self, msg: BlockImportMsg<B>) -> bool {
match msg {
BlockImportMsg::ImportBlocks(origin, incoming_blocks) => {
self.handle_import_blocks(origin, incoming_blocks)
}
BlockImportMsg::Clear => self.handle_clear(),
BlockImportMsg::Status(reply_sender) => self.handle_status(reply_sender),
BlockImportMsg::IsImporting(hash, reply_sender) => {
self.handle_is_importing(hash, reply_sender)
}
BlockImportMsg::ImportJustification(who, hash, number, justification) => {
self.handle_import_justification(who, hash, number, justification)
}
BlockImportMsg::Start(link) => {
if let Some(justification_import) = self.justification_import.as_ref() {
justification_import.on_start(&*link);
}
self.link = Some(link);
}
BlockImportMsg::Stop => {
self.handle_clear();
return false;
}
}
true
}
fn handle_worker_msg(&mut self, msg: BlockImportWorkerMsg<B>) -> bool {
let results = match msg {
BlockImportWorkerMsg::Imported(results) => (results),
_ => unreachable!("Import Worker does not send ImportBlocks message; qed"),
};
let mut has_error = false;
for (result, hash) in results {
self.queue_blocks.remove(&hash);
if has_error {
continue;
}
if result.is_err() {
self.best_importing_number = Zero::zero();
has_error = true;
}
let link = match self.link.as_ref() {
Some(link) => link,
None => {
trace!(target: "sync", "Received import result for {} while import-queue has no link", hash);
return true;
},
};
match result {
Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number),
Ok(BlockImportResult::ImportedUnknown(number)) => {
link.block_imported(&hash, number)
}
Ok(BlockImportResult::ImportedUnjustified(number)) => {
link.block_imported(&hash, number);
link.request_justification(&hash, number);
},
Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who {
link.note_useless_and_restart_sync(peer, "Sent block with incomplete header to import");
}
}
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
link.note_useless_and_restart_sync(peer, &format!("Verification failed: {}", e));
}
}
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
link.note_useless_and_restart_sync(peer, "Sent us a bad block");
}
}
Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => {
link.restart()
}
};
}
if let Some(link) = self.link.as_ref() {
link.maintain_sync();
}
true
}
fn handle_clear(&mut self) {
self.queue_blocks.clear();
self.best_importing_number = Zero::zero();
}
fn handle_status(&self, reply_sender: Sender<ImportQueueStatus<B>>) {
let status = ImportQueueStatus {
importing_count: self.queue_blocks.len(),
best_importing_number: self.best_importing_number,
};
let _ = reply_sender.send(status);
}
fn handle_is_importing(&self, hash: B::Hash, reply_sender: Sender<bool>) {
let _ = reply_sender.send(self.queue_blocks.contains(&hash));
}
fn handle_import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification) {
let success = self.justification_import.as_ref().map(|justification_import| {
justification_import.import_justification(hash, number, justification)
.map_err(|e| {
debug!("Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", e, hash, number, who);
e
}).is_ok()
}).unwrap_or(false);
if let Some(link) = self.link.as_ref() {
link.justification_imported(who, &hash, number, success);
}
}
fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
trace!(target:"sync", "Scheduling {} blocks for import", blocks.len());
let new_best_importing_number = blocks
.last()
.and_then(|b| b.header.as_ref().map(|h| h.number().clone()))
.unwrap_or_else(|| Zero::zero());
self.queue_blocks
.extend(blocks.iter().map(|b| b.hash.clone()));
if new_best_importing_number > self.best_importing_number {
self.best_importing_number = new_best_importing_number;
}
self.worker_sender
.send(BlockImportWorkerMsg::ImportBlocks(origin, blocks))
.expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed");
}
}
struct BlockImportWorker<B: BlockT, V: Verifier<B>> {
result_sender: Sender<BlockImportWorkerMsg<B>>,
block_import: SharedBlockImport<B>,
verifier: Arc<V>,
}
impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
pub fn new(
result_sender: Sender<BlockImportWorkerMsg<B>>,
verifier: Arc<V>,
block_import: SharedBlockImport<B>,
) -> Sender<BlockImportWorkerMsg<B>> {
let (sender, port) = channel::unbounded();
let _ = thread::Builder::new()
.name("ImportQueueWorker".into())
.spawn(move || {
let worker = BlockImportWorker {
result_sender,
verifier,
block_import,
};
for msg in port.iter() {
// Working until all senders have been dropped...
match msg {
BlockImportWorkerMsg::ImportBlocks(origin, blocks) => {
worker.import_a_batch_of_blocks(origin, blocks)
}
_ => unreachable!("Import Worker does not receive the Imported message; qed"),
}
}
})
.expect("ImportQueueWorker thread spawning failed");
sender
}
fn import_a_batch_of_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
let count = blocks.len();
let mut imported = 0;
let blocks_range = match (
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target:"sync", "Starting import of {} blocks {}", count, blocks_range);
let mut results = vec![];
let mut has_error = false;
// Blocks in the response/drain should be in ascending order.
for block in blocks {
let import_result = if has_error {
Err(BlockImportError::Error)
} else {
import_single_block(
&*self.block_import,
origin.clone(),
block.clone(),
self.verifier.clone(),
)
};
let was_ok = import_result.is_ok();
results.push((import_result, block.hash));
if was_ok {
imported += 1;
} else {
has_error = true;
}
}
let _ = self
.result_sender
.send(BlockImportWorkerMsg::Imported(results));
trace!(target: "sync", "Imported {} of {}", imported, count);
}
}
/// Hooks that the verification queue can use to influence the synchronization
@@ -297,27 +530,29 @@ fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>(
pub trait Link<B: BlockT>: Send {
/// Block imported.
fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) { }
/// Justification import result.
fn justification_imported(&self, _who: Origin, _hash: &B::Hash, _number: NumberFor<B>, _success: bool) { }
/// Request a justification for the given block.
fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) { }
/// Maintain sync.
fn maintain_sync(&self) { }
fn maintain_sync(&self) {}
/// Disconnect from peer.
fn useless_peer(&self, _who: Origin, _reason: &str) { }
fn useless_peer(&self, _who: Origin, _reason: &str) {}
/// Disconnect from peer and restart sync.
fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) { }
fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) {}
/// Restart sync.
fn restart(&self) { }
fn restart(&self) {}
}
/// Block import successful result.
#[derive(Debug, PartialEq)]
pub enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debug + PartialEq> {
pub enum BlockImportResult<N: ::std::fmt::Debug + PartialEq> {
/// Imported known block.
ImportedKnown(H, N),
ImportedKnown(N),
/// Imported unknown block.
ImportedUnknown(H, N),
ImportedUnknown(N),
/// Imported unjustified block that requires one.
ImportedUnjustified(H, N),
ImportedUnjustified(N),
}
/// Block import error.
@@ -335,62 +570,13 @@ pub enum BlockImportError {
Error,
}
/// Import a bunch of blocks.
pub fn import_many_blocks<'a, B: BlockT, V: Verifier<B>>(
import_handle: &BlockImport<B, Error=ConsensusError>,
link: &Link<B>,
qdata: Option<&AsyncImportQueueData<B>>,
blocks: (BlockOrigin, Vec<IncomingBlock<B>>),
verifier: Arc<V>
) -> bool
{
let (blocks_origin, blocks) = blocks;
let count = blocks.len();
let mut imported = 0;
let blocks_range = match (
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target:"sync", "Starting import of {} blocks {}", count, blocks_range);
// Blocks in the response/drain should be in ascending order.
for block in blocks {
let import_result = import_single_block(
import_handle,
blocks_origin.clone(),
block,
verifier.clone(),
);
let is_import_failed = import_result.is_err();
imported += process_import_result(link, import_result);
if is_import_failed {
qdata.map(|qdata| *qdata.best_importing_number.write() = Zero::zero());
return true;
}
if qdata.map(|qdata| qdata.is_stopping.load(Ordering::SeqCst)).unwrap_or_default() {
return false;
}
}
trace!(target: "sync", "Imported {} of {}", imported, count);
link.maintain_sync();
true
}
/// Single block import function.
pub fn import_single_block<B: BlockT, V: Verifier<B>>(
import_handle: &BlockImport<B,Error=ConsensusError>,
import_handle: &BlockImport<B, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: Arc<V>
) -> Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
{
verifier: Arc<V>,
) -> Result<BlockImportResult<NumberFor<B>>, BlockImportError> {
let peer = block.origin;
let (header, justification) = match (block.header, block.justification) {
@@ -413,18 +599,18 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
match e {
Ok(ImportResult::AlreadyInChain) => {
trace!(target: "sync", "Block already in chain {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
Ok(BlockImportResult::ImportedKnown(number))
},
Ok(ImportResult::AlreadyQueued) => {
trace!(target: "sync", "Block already queued {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedKnown(hash, number))
Ok(BlockImportResult::ImportedKnown(number))
},
Ok(ImportResult::Queued) => {
Ok(BlockImportResult::ImportedUnknown(hash, number))
Ok(BlockImportResult::ImportedUnknown(number))
},
Ok(ImportResult::NeedsJustification) => {
trace!(target: "sync", "Block queued but requires justification {}: {:?}", number, hash);
Ok(BlockImportResult::ImportedUnjustified(hash, number))
Ok(BlockImportResult::ImportedUnjustified(number))
},
Ok(ImportResult::UnknownParent) => {
debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent);
@@ -442,7 +628,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
};
match import_error(import_handle.check_block(hash, parent))? {
BlockImportResult::ImportedUnknown(_, _) => (),
BlockImportResult::ImportedUnknown(_) => (),
r @ _ => return Ok(r), // Any other successfull result means that the block is already imported.
}
@@ -459,47 +645,102 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>(
import_error(import_handle.import_block(import_block, new_authorities))
}
/// Process single block import result.
pub fn process_import_result<B: BlockT>(
link: &Link<B>,
result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError>
) -> usize
{
match result {
Ok(BlockImportResult::ImportedKnown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Ok(BlockImportResult::ImportedUnknown(hash, number)) => {
link.block_imported(&hash, number);
1
},
Ok(BlockImportResult::ImportedUnjustified(hash, number)) => {
link.block_imported(&hash, number);
link.request_justification(&hash, number);
1
},
Err(BlockImportError::IncompleteHeader(who)) => {
if let Some(peer) = who {
link.note_useless_and_restart_sync(peer, "Sent block with incomplete header to import");
#[cfg(test)]
mod tests {
use super::*;
use test_client::runtime::{Block, Hash};
#[derive(Debug, PartialEq)]
enum LinkMsg {
BlockImported,
Disconnected,
Restarted,
}
#[derive(Clone)]
struct TestLink {
sender: Sender<LinkMsg>,
}
impl TestLink {
fn new(sender: Sender<LinkMsg>) -> TestLink {
TestLink {
sender,
}
0
},
Err(BlockImportError::VerificationFailed(who, e)) => {
if let Some(peer) = who {
link.note_useless_and_restart_sync(peer, &format!("Verification failed: {}", e));
}
0
},
Err(BlockImportError::BadBlock(who)) => {
if let Some(peer) = who {
link.note_useless_and_restart_sync(peer, "Sent us a bad block");
}
0
},
Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => {
link.restart();
0
},
}
}
impl Link<Block> for TestLink {
fn block_imported(&self, _hash: &Hash, _number: NumberFor<Block>) {
let _ = self.sender.send(LinkMsg::BlockImported);
}
fn maintain_sync(&self) {
}
fn useless_peer(&self, _: Origin, _: &str) {
let _ = self.sender.send(LinkMsg::Disconnected);
}
fn note_useless_and_restart_sync(&self, id: Origin, r: &str) {
self.useless_peer(id, r);
self.restart();
}
fn restart(&self) {
let _ = self.sender.send(LinkMsg::Restarted);
}
}
#[test]
fn process_import_result_works() {
let (result_sender, result_port) = channel::unbounded();
let (worker_sender, _) = channel::unbounded();
let (link_sender, link_port) = channel::unbounded();
let importer_sender = BlockImporter::<Block>::new(result_port, worker_sender, None);
let link = TestLink::new(link_sender);
let _ = importer_sender.send(BlockImportMsg::Start(Box::new(link.clone())));
// Ensure the importer handles Start before any result messages.
let (ack_sender, ack_port) = channel::unbounded();
let _ = importer_sender.send(BlockImportMsg::Status(ack_sender));
let _ = ack_port.recv();
// Send a known
let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())];
let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap();
assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported));
// Send a second known
let results = vec![(Ok(BlockImportResult::ImportedKnown(Default::default())), Default::default())];
let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap();
assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported));
// Send an unknown
let results = vec![(Ok(BlockImportResult::ImportedUnknown(Default::default())), Default::default())];
let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap();
assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported));
// Send an incomplete header
let results = vec![(Err(BlockImportError::IncompleteHeader(Some(Default::default()))), Default::default())];
let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap();
assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected));
assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted));
// Send an unknown parent
let results = vec![(Err(BlockImportError::UnknownParent), Default::default())];
let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap();
assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted));
// Send a verification failed
let results = vec![(Err(BlockImportError::VerificationFailed(Some(0), String::new())), Default::default())];
let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap();
assert_eq!(link_port.recv(), Ok(LinkMsg::Disconnected));
assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted));
// Send an error
let results = vec![(Err(BlockImportError::Error), Default::default())];
let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap();
assert_eq!(link_port.recv(), Ok(LinkMsg::Restarted));
// Drop the importer sender first, ensuring graceful shutdown.
drop(importer_sender);
}
}
@@ -26,6 +26,9 @@
// our error-chain could potentially blow up otherwise
#![recursion_limit="128"]
#[macro_use] extern crate crossbeam_channel;
#[macro_use] extern crate log;
use std::sync::Arc;
use std::time::Duration;