Change the import queue traits to take &mut self instead of &self (#3058)

* SharedFinalityProofRequestBuilder -> BoxFinalityProofRequestBuilder

* SharedThings -> BoxThings

* Fix tests

* build_request_data now takes &mut self

* The other traits now also take &mut self

* More or less fix tests

* Fix tests

* Fix more tests

* Moar tests

* Don't call make_block_import multiple time

* Fix doctest
This commit is contained in:
Pierre Krieger
2019-07-09 17:11:25 +02:00
committed by Gavin Wood
parent e729dbabbe
commit d7b6720663
21 changed files with 268 additions and 150 deletions
@@ -20,6 +20,7 @@ use runtime_primitives::traits::{Block as BlockT, DigestItemFor, Header as Heade
use runtime_primitives::Justification;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use crate::well_known_cache_keys;
use crate::import_queue::Verifier;
@@ -175,7 +176,7 @@ pub trait BlockImport<B: BlockT> {
/// Check block preconditions.
fn check_block(
&self,
&mut self,
hash: B::Hash,
parent_hash: B::Hash,
) -> Result<ImportResult, Self::Error>;
@@ -184,23 +185,45 @@ pub trait BlockImport<B: BlockT> {
///
/// Cached data can be accessed through the blockchain cache.
fn import_block(
&self,
&mut self,
block: ImportBlock<B>,
cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
) -> Result<ImportResult, Self::Error>;
}
impl<B: BlockT, T, E: ::std::error::Error + Send + 'static> BlockImport<B> for Arc<T>
where for<'r> &'r T: BlockImport<B, Error = E>
{
type Error = E;
fn check_block(
&mut self,
hash: B::Hash,
parent_hash: B::Hash,
) -> Result<ImportResult, Self::Error> {
(&**self).check_block(hash, parent_hash)
}
fn import_block(
&mut self,
block: ImportBlock<B>,
cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
) -> Result<ImportResult, Self::Error> {
(&**self).import_block(block, cache)
}
}
/// Justification import trait
pub trait JustificationImport<B: BlockT> {
type Error: ::std::error::Error + Send + 'static;
/// Called by the import queue when it is started. Returns a list of justifications to request
/// from the network.
fn on_start(&self) -> Vec<(B::Hash, NumberFor<B>)> { Vec::new() }
fn on_start(&mut self) -> Vec<(B::Hash, NumberFor<B>)> { Vec::new() }
/// Import a Block justification and finalize the given block.
fn import_justification(
&self,
&mut self,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification,
@@ -213,11 +236,11 @@ pub trait FinalityProofImport<B: BlockT> {
/// Called by the import queue when it is started. Returns a list of finality proofs to request
/// from the network.
fn on_start(&self) -> Vec<(B::Hash, NumberFor<B>)> { Vec::new() }
fn on_start(&mut self) -> Vec<(B::Hash, NumberFor<B>)> { Vec::new() }
/// Import a Block justification and finalize the given block. Returns finalized block or error.
fn import_finality_proof(
&self,
&mut self,
hash: B::Hash,
number: NumberFor<B>,
finality_proof: Vec<u8>,
@@ -228,5 +251,5 @@ pub trait FinalityProofImport<B: BlockT> {
/// Finality proof request builder.
pub trait FinalityProofRequestBuilder<B: BlockT>: Send {
/// Build data blob, associated with the request.
fn build_request_data(&self, hash: &B::Hash) -> Vec<u8>;
fn build_request_data(&mut self, hash: &B::Hash) -> Vec<u8>;
}
@@ -39,16 +39,16 @@ mod basic_queue;
pub mod buffered_link;
/// Shared block import struct used by the queue.
pub type SharedBlockImport<B> = Arc<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
pub type BoxBlockImport<B> = Box<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
/// Shared justification import struct used by the queue.
pub type SharedJustificationImport<B> = Arc<dyn JustificationImport<B, Error=ConsensusError> + Send + Sync>;
pub type BoxJustificationImport<B> = Box<dyn JustificationImport<B, Error=ConsensusError> + Send + Sync>;
/// Shared finality proof import struct used by the queue.
pub type SharedFinalityProofImport<B> = Arc<dyn FinalityProofImport<B, Error=ConsensusError> + Send + Sync>;
pub type BoxFinalityProofImport<B> = Box<dyn FinalityProofImport<B, Error=ConsensusError> + Send + Sync>;
/// Shared finality proof request builder struct used by the queue.
pub type SharedFinalityProofRequestBuilder<B> = Arc<dyn FinalityProofRequestBuilder<B> + Send + Sync>;
pub type BoxFinalityProofRequestBuilder<B> = Box<dyn FinalityProofRequestBuilder<B> + Send + Sync>;
/// Maps to the Origin used by the network.
pub type Origin = libp2p::PeerId;
@@ -140,7 +140,7 @@ pub trait Link<B: BlockT>: Send {
/// Request a finality proof for the given block.
fn request_finality_proof(&mut self, _hash: &B::Hash, _number: NumberFor<B>) {}
/// Remember finality proof request builder on start.
fn set_finality_proof_request_builder(&mut self, _request_builder: SharedFinalityProofRequestBuilder<B>) {}
fn set_finality_proof_request_builder(&mut self, _request_builder: BoxFinalityProofRequestBuilder<B>) {}
/// Adjusts the reputation of the given peer.
fn report_peer(&mut self, _who: Origin, _reputation_change: i32) {}
/// Restart sync.
@@ -173,7 +173,7 @@ pub enum BlockImportError {
/// Single block import function.
pub fn import_single_block<B: BlockT, V: Verifier<B>>(
import_handle: &dyn BlockImport<B, Error = ConsensusError>,
import_handle: &mut dyn BlockImport<B, Error = ConsensusError>,
block_origin: BlockOrigin,
block: IncomingBlock<B>,
verifier: Arc<V>,
@@ -21,8 +21,8 @@ use runtime_primitives::{Justification, traits::{Block as BlockT, Header as Head
use crate::error::Error as ConsensusError;
use crate::block_import::{BlockImport, BlockOrigin};
use crate::import_queue::{
BlockImportResult, BlockImportError, Verifier, SharedBlockImport, SharedFinalityProofImport,
SharedFinalityProofRequestBuilder, SharedJustificationImport, ImportQueue, Link, Origin,
BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport,
BoxFinalityProofRequestBuilder, BoxJustificationImport, ImportQueue, Link, Origin,
IncomingBlock, import_single_block,
buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver}
};
@@ -44,7 +44,7 @@ pub struct BasicQueue<B: BlockT> {
/// Results coming from the worker task.
result_port: BufferedLinkReceiver<B>,
/// Sent through the link as soon as possible.
finality_proof_request_builder: Option<SharedFinalityProofRequestBuilder<B>>,
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
/// Since we have to be in a tokio context in order to spawn background tasks, we first store
/// the task to spawn here, then extract it as soon as we are in a tokio context.
/// If `Some`, contains the task to spawn in the background. If `None`, the future has already
@@ -63,10 +63,10 @@ impl<B: BlockT> BasicQueue<B> {
/// finality proof importer.
pub fn new<V: 'static + Verifier<B>>(
verifier: Arc<V>,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
finality_proof_import: Option<SharedFinalityProofImport<B>>,
finality_proof_request_builder: Option<SharedFinalityProofRequestBuilder<B>>,
block_import: BoxBlockImport<B>,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let (future, worker_sender) = BlockImportWorker::new(
@@ -148,9 +148,9 @@ enum ToWorkerMsg<B: BlockT> {
struct BlockImportWorker<B: BlockT, V: Verifier<B>> {
result_sender: BufferedLinkSender<B>,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
finality_proof_import: Option<SharedFinalityProofImport<B>>,
block_import: BoxBlockImport<B>,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
verifier: Arc<V>,
}
@@ -158,9 +158,9 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
fn new(
result_sender: BufferedLinkSender<B>,
verifier: Arc<V>,
block_import: SharedBlockImport<B>,
justification_import: Option<SharedJustificationImport<B>>,
finality_proof_import: Option<SharedFinalityProofImport<B>>,
block_import: BoxBlockImport<B>,
justification_import: Option<BoxJustificationImport<B>>,
finality_proof_import: Option<BoxFinalityProofImport<B>>,
) -> (impl Future<Item = (), Error = ()> + Send, mpsc::UnboundedSender<ToWorkerMsg<B>>) {
let (sender, mut port) = mpsc::unbounded();
@@ -172,13 +172,13 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
finality_proof_import,
};
if let Some(justification_import) = worker.justification_import.as_ref() {
if let Some(justification_import) = worker.justification_import.as_mut() {
for (hash, number) in justification_import.on_start() {
worker.result_sender.request_justification(&hash, number);
}
}
if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() {
if let Some(finality_proof_import) = worker.finality_proof_import.as_mut() {
for (hash, number) in finality_proof_import.on_start() {
worker.result_sender.request_finality_proof(&hash, number);
}
@@ -211,7 +211,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
let (imported, count, results) = import_many_blocks(
&*self.block_import,
&mut *self.block_import,
origin,
blocks,
self.verifier.clone(),
@@ -295,8 +295,9 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
}
fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
let result = self.finality_proof_import.as_ref().map(|finality_proof_import| {
finality_proof_import.import_finality_proof(hash, number, finality_proof, &*self.verifier)
let verifier = &*self.verifier;
let result = self.finality_proof_import.as_mut().map(|finality_proof_import| {
finality_proof_import.import_finality_proof(hash, number, finality_proof, verifier)
.map_err(|e| {
debug!(
"Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}",
@@ -319,7 +320,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
number: NumberFor<B>,
justification: Justification
) {
let success = self.justification_import.as_ref().map(|justification_import| {
let success = self.justification_import.as_mut().map(|justification_import| {
justification_import.import_justification(hash, number, justification)
.map_err(|e| {
debug!(
@@ -340,7 +341,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
/// Import several blocks at once, returning import result for each block.
fn import_many_blocks<B: BlockT, V: Verifier<B>>(
import_handle: &dyn BlockImport<B, Error = ConsensusError>,
import_handle: &mut dyn BlockImport<B, Error = ConsensusError>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: Arc<V>,
@@ -34,7 +34,7 @@
use futures::{prelude::*, sync::mpsc};
use runtime_primitives::traits::{Block as BlockT, NumberFor};
use crate::import_queue::{Origin, Link, SharedFinalityProofRequestBuilder};
use crate::import_queue::{Origin, Link, BoxFinalityProofRequestBuilder};
/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
@@ -60,7 +60,7 @@ enum BlockImportWorkerMsg<B: BlockT> {
RequestJustification(B::Hash, NumberFor<B>),
FinalityProofImported(Origin, (B::Hash, NumberFor<B>), Result<(B::Hash, NumberFor<B>), ()>),
RequestFinalityProof(B::Hash, NumberFor<B>),
SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder<B>),
SetFinalityProofRequestBuilder(BoxFinalityProofRequestBuilder<B>),
ReportPeer(Origin, i32),
Restart,
}
@@ -107,7 +107,7 @@ impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestFinalityProof(hash.clone(), number));
}
fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder<B>) {
fn set_finality_proof_request_builder(&mut self, request_builder: BoxFinalityProofRequestBuilder<B>) {
let _ = self.tx.unbounded_send(BlockImportWorkerMsg::SetFinalityProofRequestBuilder(request_builder));
}