mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-14 09:51:10 +00:00
Synchronous import queue + fix async inport queue shutdown (#2701)
* sync implementation of ImportQueue * fix import queue shutdown * never clone import queue
This commit is contained in:
committed by
Gavin Wood
parent
549d9e1da1
commit
48b2ba041f
Generated
+1
@@ -3999,6 +3999,7 @@ dependencies = [
|
|||||||
"libp2p 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"libp2p 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
"parity-codec 3.5.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"sr-primitives 2.0.0",
|
"sr-primitives 2.0.0",
|
||||||
"sr-std 2.0.0",
|
"sr-std 2.0.0",
|
||||||
"sr-version 2.0.0",
|
"sr-version 2.0.0",
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ runtime_version = { package = "sr-version", path = "../../sr-version" }
|
|||||||
runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" }
|
runtime_primitives = { package = "sr-primitives", path = "../../sr-primitives" }
|
||||||
tokio-timer = "0.2"
|
tokio-timer = "0.2"
|
||||||
parity-codec = { version = "3.3", features = ["derive"] }
|
parity-codec = { version = "3.3", features = ["derive"] }
|
||||||
|
parking_lot = "0.7.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
test_client = { package = "substrate-test-client", path = "../../test-client" }
|
test_client = { package = "substrate-test-client", path = "../../test-client" }
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ use crate::block_import::{
|
|||||||
};
|
};
|
||||||
use crossbeam_channel::{self as channel, Receiver, Sender};
|
use crossbeam_channel::{self as channel, Receiver, Sender};
|
||||||
use parity_codec::Encode;
|
use parity_codec::Encode;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
@@ -97,7 +98,7 @@ pub trait Verifier<B: BlockT>: Send + Sync {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Blocks import queue API.
|
/// Blocks import queue API.
|
||||||
pub trait ImportQueue<B: BlockT>: Send + Sync + ImportQueueClone<B> {
|
pub trait ImportQueue<B: BlockT>: Send + Sync {
|
||||||
/// Start background work for the queue as necessary.
|
/// Start background work for the queue as necessary.
|
||||||
///
|
///
|
||||||
/// This is called automatically by the network service when synchronization
|
/// This is called automatically by the network service when synchronization
|
||||||
@@ -105,8 +106,6 @@ pub trait ImportQueue<B: BlockT>: Send + Sync + ImportQueueClone<B> {
|
|||||||
fn start(&self, _link: Box<Link<B>>) -> Result<(), std::io::Error> {
|
fn start(&self, _link: Box<Link<B>>) -> Result<(), std::io::Error> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
/// Clears the import queue and stops importing.
|
|
||||||
fn stop(&self);
|
|
||||||
/// Import bunch of blocks.
|
/// Import bunch of blocks.
|
||||||
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
|
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
|
||||||
/// Import a block justification.
|
/// Import a block justification.
|
||||||
@@ -115,13 +114,96 @@ pub trait ImportQueue<B: BlockT>: Send + Sync + ImportQueueClone<B> {
|
|||||||
fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>);
|
fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait ImportQueueClone<B: BlockT> {
|
/// Basic block import queue that performs import in the caller thread.
|
||||||
fn clone_box(&self) -> Box<ImportQueue<B>>;
|
pub struct BasicSyncQueue<B: BlockT, V: Verifier<B>> {
|
||||||
|
data: Arc<BasicSyncQueueData<B, V>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BlockT> Clone for Box<ImportQueue<B>> {
|
struct BasicSyncQueueData<B: BlockT, V: Verifier<B>> {
|
||||||
fn clone(&self) -> Box<ImportQueue<B>> {
|
link: Mutex<Option<Box<Link<B>>>>,
|
||||||
self.clone_box()
|
block_import: SharedBlockImport<B>,
|
||||||
|
verifier: Arc<V>,
|
||||||
|
justification_import: Option<SharedJustificationImport<B>>,
|
||||||
|
finality_proof_import: Option<SharedFinalityProofImport<B>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlockT, V: Verifier<B>> BasicSyncQueue<B, V> {
|
||||||
|
pub fn new(
|
||||||
|
block_import: SharedBlockImport<B>,
|
||||||
|
verifier: Arc<V>,
|
||||||
|
justification_import: Option<SharedJustificationImport<B>>,
|
||||||
|
finality_proof_import: Option<SharedFinalityProofImport<B>>,
|
||||||
|
) -> Self {
|
||||||
|
BasicSyncQueue {
|
||||||
|
data: Arc::new(BasicSyncQueueData {
|
||||||
|
link: Mutex::new(None),
|
||||||
|
block_import,
|
||||||
|
verifier,
|
||||||
|
justification_import,
|
||||||
|
finality_proof_import,
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicSyncQueue<B, V> {
|
||||||
|
fn start(&self, link: Box<Link<B>>) -> Result<(), std::io::Error> {
|
||||||
|
if let Some(justification_import) = self.data.justification_import.as_ref() {
|
||||||
|
justification_import.on_start(&*link);
|
||||||
|
}
|
||||||
|
*self.data.link.lock() = Some(link);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
|
||||||
|
if blocks.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (imported, count, results) = import_many_blocks(
|
||||||
|
&*self.data.block_import,
|
||||||
|
origin,
|
||||||
|
blocks,
|
||||||
|
self.data.verifier.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let link_ref = self.data.link.lock();
|
||||||
|
let link = match link_ref.as_ref() {
|
||||||
|
Some(link) => link,
|
||||||
|
None => {
|
||||||
|
trace!(target: "sync", "Trying to import blocks before starting import queue");
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
process_import_results(&**link, results);
|
||||||
|
|
||||||
|
trace!(target: "sync", "Imported {} of {}", imported, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification) {
|
||||||
|
import_single_justification(
|
||||||
|
&*self.data.link.lock(),
|
||||||
|
&self.data.justification_import,
|
||||||
|
who,
|
||||||
|
hash,
|
||||||
|
number,
|
||||||
|
justification,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
|
||||||
|
let result = import_single_finality_proof(
|
||||||
|
&self.data.finality_proof_import,
|
||||||
|
&*self.data.verifier,
|
||||||
|
&who,
|
||||||
|
hash,
|
||||||
|
number,
|
||||||
|
finality_proof,
|
||||||
|
);
|
||||||
|
if let Some(link) = self.data.link.lock().as_ref() {
|
||||||
|
link.finality_proof_imported(who, (hash, number), result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,16 +211,20 @@ impl<B: BlockT> Clone for Box<ImportQueue<B>> {
|
|||||||
/// sequentially in a separate thread, with pluggable verification.
|
/// sequentially in a separate thread, with pluggable verification.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct BasicQueue<B: BlockT> {
|
pub struct BasicQueue<B: BlockT> {
|
||||||
sender: Sender<BlockImportMsg<B>>,
|
sender: Option<Sender<BlockImportMsg<B>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BlockT> ImportQueueClone<B> for BasicQueue<B> {
|
impl<B: BlockT> Drop for BasicQueue<B> {
|
||||||
fn clone_box(&self) -> Box<ImportQueue<B>> {
|
fn drop(&mut self) {
|
||||||
Box::new(self.clone())
|
if let Some(sender) = self.sender.take() {
|
||||||
|
let (shutdown_sender, shutdown_receiver) = channel::unbounded();
|
||||||
|
if sender.send(BlockImportMsg::Shutdown(shutdown_sender)).is_ok() {
|
||||||
|
let _ = shutdown_receiver.recv();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/// "BasicQueue" is a wrapper around a channel sender to the "BlockImporter".
|
/// "BasicQueue" is a wrapper around a channel sender to the "BlockImporter".
|
||||||
/// "BasicQueue" itself does not keep any state or do any importing work, and
|
/// "BasicQueue" itself does not keep any state or do any importing work, and
|
||||||
/// can therefore be send to other threads.
|
/// can therefore be send to other threads.
|
||||||
@@ -184,7 +270,7 @@ impl<B: BlockT> BasicQueue<B> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
sender: importer_sender,
|
sender: Some(importer_sender),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,52 +280,47 @@ impl<B: BlockT> BasicQueue<B> {
|
|||||||
/// has synchronized with ImportQueue.
|
/// has synchronized with ImportQueue.
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
pub fn synchronize(&self) {
|
pub fn synchronize(&self) {
|
||||||
self
|
if let Some(ref sender) = self.sender {
|
||||||
.sender
|
let _ = sender.send(BlockImportMsg::Synchronize);
|
||||||
.send(BlockImportMsg::Synchronize)
|
}
|
||||||
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
|
impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
|
||||||
fn start(&self, link: Box<Link<B>>) -> Result<(), std::io::Error> {
|
fn start(&self, link: Box<Link<B>>) -> Result<(), std::io::Error> {
|
||||||
let (sender, port) = channel::unbounded();
|
let connect_err = || Err(std::io::Error::new(
|
||||||
let _ = self
|
std::io::ErrorKind::Other,
|
||||||
.sender
|
"Failed to connect import queue threads",
|
||||||
.send(BlockImportMsg::Start(link, sender))
|
));
|
||||||
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
|
if let Some(ref sender) = self.sender {
|
||||||
port.recv().expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed")
|
let (start_sender, start_port) = channel::unbounded();
|
||||||
}
|
let _ = sender.send(BlockImportMsg::Start(link, start_sender));
|
||||||
|
start_port.recv().unwrap_or_else(|_| connect_err())
|
||||||
fn stop(&self) {
|
} else {
|
||||||
let _ = self
|
connect_err()
|
||||||
.sender
|
}
|
||||||
.send(BlockImportMsg::Stop)
|
|
||||||
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
|
fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
|
||||||
if blocks.is_empty() {
|
if blocks.is_empty() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let _ = self
|
|
||||||
.sender
|
if let Some(ref sender) = self.sender {
|
||||||
.send(BlockImportMsg::ImportBlocks(origin, blocks))
|
let _ = sender.send(BlockImportMsg::ImportBlocks(origin, blocks));
|
||||||
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification) {
|
fn import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification) {
|
||||||
let _ = self
|
if let Some(ref sender) = self.sender {
|
||||||
.sender
|
let _ = sender.send(BlockImportMsg::ImportJustification(who.clone(), hash, number, justification));
|
||||||
.send(BlockImportMsg::ImportJustification(who.clone(), hash, number, justification))
|
}
|
||||||
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
|
fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
|
||||||
let _ = self
|
if let Some(ref sender) = self.sender {
|
||||||
.sender
|
let _ = sender.send(BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof));
|
||||||
.send(BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof))
|
}
|
||||||
.expect("1. self is holding a sender to the Importer, 2. Importer should handle messages while there are senders around; qed");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,12 +329,12 @@ pub enum BlockImportMsg<B: BlockT> {
|
|||||||
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
|
ImportJustification(Origin, B::Hash, NumberFor<B>, Justification),
|
||||||
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
|
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
|
||||||
Start(Box<Link<B>>, Sender<Result<(), std::io::Error>>),
|
Start(Box<Link<B>>, Sender<Result<(), std::io::Error>>),
|
||||||
Stop,
|
Shutdown(Sender<()>),
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
Synchronize,
|
Synchronize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(test, derive(Debug, PartialEq))]
|
#[cfg_attr(test, derive(Debug))]
|
||||||
pub enum BlockImportWorkerMsg<B: BlockT> {
|
pub enum BlockImportWorkerMsg<B: BlockT> {
|
||||||
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
|
ImportBlocks(BlockOrigin, Vec<IncomingBlock<B>>),
|
||||||
ImportedBlocks(
|
ImportedBlocks(
|
||||||
@@ -264,6 +345,7 @@ pub enum BlockImportWorkerMsg<B: BlockT> {
|
|||||||
),
|
),
|
||||||
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
|
ImportFinalityProof(Origin, B::Hash, NumberFor<B>, Vec<u8>),
|
||||||
ImportedFinalityProof(Origin, (B::Hash, NumberFor<B>), Result<(B::Hash, NumberFor<B>), ()>),
|
ImportedFinalityProof(Origin, (B::Hash, NumberFor<B>), Result<(B::Hash, NumberFor<B>), ()>),
|
||||||
|
Shutdown(Sender<()>),
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
Synchronize,
|
Synchronize,
|
||||||
}
|
}
|
||||||
@@ -276,7 +358,7 @@ enum ImportMsgType<B: BlockT> {
|
|||||||
struct BlockImporter<B: BlockT> {
|
struct BlockImporter<B: BlockT> {
|
||||||
port: Receiver<BlockImportMsg<B>>,
|
port: Receiver<BlockImportMsg<B>>,
|
||||||
result_port: Receiver<BlockImportWorkerMsg<B>>,
|
result_port: Receiver<BlockImportWorkerMsg<B>>,
|
||||||
worker_sender: Sender<BlockImportWorkerMsg<B>>,
|
worker_sender: Option<Sender<BlockImportWorkerMsg<B>>>,
|
||||||
link: Option<Box<dyn Link<B>>>,
|
link: Option<Box<dyn Link<B>>>,
|
||||||
verifier: Arc<Verifier<B>>,
|
verifier: Arc<Verifier<B>>,
|
||||||
justification_import: Option<SharedJustificationImport<B>>,
|
justification_import: Option<SharedJustificationImport<B>>,
|
||||||
@@ -301,7 +383,7 @@ impl<B: BlockT> BlockImporter<B> {
|
|||||||
let mut importer = BlockImporter {
|
let mut importer = BlockImporter {
|
||||||
port,
|
port,
|
||||||
result_port,
|
result_port,
|
||||||
worker_sender,
|
worker_sender: Some(worker_sender),
|
||||||
link: None,
|
link: None,
|
||||||
verifier,
|
verifier,
|
||||||
justification_import,
|
justification_import,
|
||||||
@@ -345,7 +427,14 @@ impl<B: BlockT> BlockImporter<B> {
|
|||||||
self.handle_import_blocks(origin, incoming_blocks)
|
self.handle_import_blocks(origin, incoming_blocks)
|
||||||
},
|
},
|
||||||
BlockImportMsg::ImportJustification(who, hash, number, justification) => {
|
BlockImportMsg::ImportJustification(who, hash, number, justification) => {
|
||||||
self.handle_import_justification(who, hash, number, justification)
|
import_single_justification(
|
||||||
|
&self.link,
|
||||||
|
&self.justification_import,
|
||||||
|
who,
|
||||||
|
hash,
|
||||||
|
number,
|
||||||
|
justification,
|
||||||
|
);
|
||||||
},
|
},
|
||||||
BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof) => {
|
BlockImportMsg::ImportFinalityProof(who, hash, number, finality_proof) => {
|
||||||
self.handle_import_finality_proof(who, hash, number, finality_proof)
|
self.handle_import_finality_proof(who, hash, number, finality_proof)
|
||||||
@@ -363,13 +452,24 @@ impl<B: BlockT> BlockImporter<B> {
|
|||||||
self.link = Some(link);
|
self.link = Some(link);
|
||||||
let _ = sender.send(Ok(()));
|
let _ = sender.send(Ok(()));
|
||||||
},
|
},
|
||||||
BlockImportMsg::Stop => return false,
|
BlockImportMsg::Shutdown(result_sender) => {
|
||||||
|
// stop worker thread
|
||||||
|
if let Some(worker_sender) = self.worker_sender.take() {
|
||||||
|
let (sender, receiver) = channel::unbounded();
|
||||||
|
if worker_sender.send(BlockImportWorkerMsg::Shutdown(sender)).is_ok() {
|
||||||
|
let _ = receiver.recv();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// send shutdown notification
|
||||||
|
let _ = result_sender.send(());
|
||||||
|
return false;
|
||||||
|
},
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
BlockImportMsg::Synchronize => {
|
BlockImportMsg::Synchronize => {
|
||||||
trace!(target: "sync", "Received synchronization message");
|
trace!(target: "sync", "Received synchronization message");
|
||||||
self.worker_sender
|
if let Some(ref worker_sender) = self.worker_sender {
|
||||||
.send(BlockImportWorkerMsg::Synchronize)
|
let _ = worker_sender.send(BlockImportWorkerMsg::Synchronize);
|
||||||
.expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed");
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
true
|
true
|
||||||
@@ -398,106 +498,26 @@ impl<B: BlockT> BlockImporter<B> {
|
|||||||
},
|
},
|
||||||
BlockImportWorkerMsg::ImportBlocks(_, _)
|
BlockImportWorkerMsg::ImportBlocks(_, _)
|
||||||
| BlockImportWorkerMsg::ImportFinalityProof(_, _, _, _)
|
| BlockImportWorkerMsg::ImportFinalityProof(_, _, _, _)
|
||||||
=> unreachable!("Import Worker does not send Import* message; qed"),
|
| BlockImportWorkerMsg::Shutdown(_)
|
||||||
|
=> unreachable!("Import Worker does not send Import*/Shutdown messages; qed"),
|
||||||
};
|
};
|
||||||
let mut has_error = false;
|
|
||||||
let mut hashes = vec![];
|
|
||||||
for (result, hash) in results {
|
|
||||||
hashes.push(hash);
|
|
||||||
|
|
||||||
if has_error {
|
process_import_results(&**link, results);
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if result.is_err() {
|
|
||||||
has_error = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number),
|
|
||||||
Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => {
|
|
||||||
link.block_imported(&hash, number);
|
|
||||||
|
|
||||||
if aux.clear_justification_requests {
|
|
||||||
trace!(target: "sync", "Block imported clears all pending justification requests {}: {:?}", number, hash);
|
|
||||||
link.clear_justification_requests();
|
|
||||||
}
|
|
||||||
|
|
||||||
if aux.needs_justification {
|
|
||||||
trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash);
|
|
||||||
link.request_justification(&hash, number);
|
|
||||||
}
|
|
||||||
|
|
||||||
if aux.bad_justification {
|
|
||||||
if let Some(peer) = who {
|
|
||||||
info!("Sent block with bad justification to import");
|
|
||||||
link.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if aux.needs_finality_proof {
|
|
||||||
trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash);
|
|
||||||
link.request_finality_proof(&hash, number);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(BlockImportError::IncompleteHeader(who)) => {
|
|
||||||
if let Some(peer) = who {
|
|
||||||
info!("Peer sent block with incomplete header to import");
|
|
||||||
link.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE);
|
|
||||||
link.restart();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(BlockImportError::VerificationFailed(who, e)) => {
|
|
||||||
if let Some(peer) = who {
|
|
||||||
info!("Verification failed from peer: {}", e);
|
|
||||||
link.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE);
|
|
||||||
link.restart();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(BlockImportError::BadBlock(who)) => {
|
|
||||||
if let Some(peer) = who {
|
|
||||||
info!("Bad block");
|
|
||||||
link.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE);
|
|
||||||
link.restart();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => {
|
|
||||||
link.restart();
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if let Some(link) = self.link.as_ref() {
|
|
||||||
link.blocks_processed(hashes, has_error);
|
|
||||||
}
|
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_import_justification(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, justification: Justification) {
|
fn handle_import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
|
||||||
let success = self.justification_import.as_ref().map(|justification_import| {
|
if let Some(ref worker_sender) = self.worker_sender {
|
||||||
justification_import.import_justification(hash, number, justification)
|
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
|
||||||
.map_err(|e| {
|
let _ = worker_sender.send(BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof));
|
||||||
debug!(target: "sync", "Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", e, hash, number, who);
|
|
||||||
e
|
|
||||||
}).is_ok()
|
|
||||||
}).unwrap_or(false);
|
|
||||||
|
|
||||||
if let Some(link) = self.link.as_ref() {
|
|
||||||
link.justification_imported(who, &hash, number, success);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
|
|
||||||
trace!(target: "sync", "Scheduling finality proof of {}/{} for import", number, hash);
|
|
||||||
self.worker_sender
|
|
||||||
.send(BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, finality_proof))
|
|
||||||
.expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed");
|
|
||||||
}
|
|
||||||
|
|
||||||
fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
|
fn handle_import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
|
||||||
trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
|
if let Some(ref worker_sender) = self.worker_sender {
|
||||||
self.worker_sender
|
trace!(target: "sync", "Scheduling {} blocks for import", blocks.len());
|
||||||
.send(BlockImportWorkerMsg::ImportBlocks(origin, blocks))
|
let _ = worker_sender.send(BlockImportWorkerMsg::ImportBlocks(origin, blocks));
|
||||||
.expect("1. This is holding a sender to the worker, 2. the worker should not quit while a sender is still held; qed");
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -534,6 +554,10 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
|
|||||||
BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
|
BlockImportWorkerMsg::ImportFinalityProof(who, hash, number, proof) => {
|
||||||
worker.import_finality_proof(who, hash, number, proof);
|
worker.import_finality_proof(who, hash, number, proof);
|
||||||
},
|
},
|
||||||
|
BlockImportWorkerMsg::Shutdown(result_sender) => {
|
||||||
|
let _ = result_sender.send(());
|
||||||
|
break;
|
||||||
|
},
|
||||||
#[cfg(any(test, feature = "test-helpers"))]
|
#[cfg(any(test, feature = "test-helpers"))]
|
||||||
BlockImportWorkerMsg::Synchronize => {
|
BlockImportWorkerMsg::Synchronize => {
|
||||||
trace!(target: "sync", "Sending sync message");
|
trace!(target: "sync", "Sending sync message");
|
||||||
@@ -550,44 +574,12 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn import_a_batch_of_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
|
fn import_a_batch_of_blocks(&self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
|
||||||
let count = blocks.len();
|
let (imported, count, results) = import_many_blocks(
|
||||||
let mut imported = 0;
|
&*self.block_import,
|
||||||
|
origin,
|
||||||
let blocks_range = match (
|
blocks,
|
||||||
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
self.verifier.clone(),
|
||||||
blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
);
|
||||||
) {
|
|
||||||
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
|
|
||||||
(Some(first), Some(_)) => format!(" ({})", first),
|
|
||||||
_ => Default::default(),
|
|
||||||
};
|
|
||||||
|
|
||||||
trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range);
|
|
||||||
|
|
||||||
let mut results = vec![];
|
|
||||||
|
|
||||||
let mut has_error = false;
|
|
||||||
|
|
||||||
// Blocks in the response/drain should be in ascending order.
|
|
||||||
for block in blocks {
|
|
||||||
let import_result = if has_error {
|
|
||||||
Err(BlockImportError::Error)
|
|
||||||
} else {
|
|
||||||
import_single_block(
|
|
||||||
&*self.block_import,
|
|
||||||
origin.clone(),
|
|
||||||
block.clone(),
|
|
||||||
self.verifier.clone(),
|
|
||||||
)
|
|
||||||
};
|
|
||||||
let was_ok = import_result.is_ok();
|
|
||||||
results.push((import_result, block.hash));
|
|
||||||
if was_ok {
|
|
||||||
imported += 1;
|
|
||||||
} else {
|
|
||||||
has_error = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = self
|
let _ = self
|
||||||
.result_sender
|
.result_sender
|
||||||
@@ -597,24 +589,18 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
|
fn import_finality_proof(&self, who: Origin, hash: B::Hash, number: NumberFor<B>, finality_proof: Vec<u8>) {
|
||||||
let result = self.finality_proof_import.as_ref().map(|finality_proof_import| {
|
let result = import_single_finality_proof(
|
||||||
finality_proof_import.import_finality_proof(hash, number, finality_proof, &*self.verifier)
|
&self.finality_proof_import,
|
||||||
.map_err(|e| {
|
&*self.verifier,
|
||||||
debug!(
|
&who,
|
||||||
"Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}",
|
hash,
|
||||||
e,
|
number,
|
||||||
hash,
|
finality_proof,
|
||||||
number,
|
);
|
||||||
who,
|
|
||||||
);
|
|
||||||
})
|
|
||||||
}).unwrap_or(Err(()));
|
|
||||||
|
|
||||||
let _ = self
|
let _ = self
|
||||||
.result_sender
|
.result_sender
|
||||||
.send(BlockImportWorkerMsg::ImportedFinalityProof(who, (hash, number), result));
|
.send(BlockImportWorkerMsg::ImportedFinalityProof(who, (hash, number), result));
|
||||||
|
|
||||||
trace!(target: "sync", "Imported finality proof for {}/{}", number, hash);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -679,6 +665,193 @@ pub enum BlockImportError {
|
|||||||
Error,
|
Error,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Imports single notification and send notification to the link (if provided).
|
||||||
|
fn import_single_justification<B: BlockT>(
|
||||||
|
link: &Option<Box<Link<B>>>,
|
||||||
|
justification_import: &Option<SharedJustificationImport<B>>,
|
||||||
|
who: Origin,
|
||||||
|
hash: B::Hash,
|
||||||
|
number: NumberFor<B>,
|
||||||
|
justification: Justification,
|
||||||
|
) {
|
||||||
|
let success = justification_import.as_ref().map(|justification_import| {
|
||||||
|
justification_import.import_justification(hash, number, justification)
|
||||||
|
.map_err(|e| {
|
||||||
|
debug!(
|
||||||
|
target: "sync",
|
||||||
|
"Justification import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}",
|
||||||
|
e,
|
||||||
|
hash,
|
||||||
|
number,
|
||||||
|
who,
|
||||||
|
);
|
||||||
|
e
|
||||||
|
}).is_ok()
|
||||||
|
}).unwrap_or(false);
|
||||||
|
|
||||||
|
if let Some(ref link) = link {
|
||||||
|
link.justification_imported(who, &hash, number, success);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Imports single finality_proof.
|
||||||
|
fn import_single_finality_proof<B: BlockT, V: Verifier<B>>(
|
||||||
|
finality_proof_import: &Option<SharedFinalityProofImport<B>>,
|
||||||
|
verifier: &V,
|
||||||
|
who: &Origin,
|
||||||
|
hash: B::Hash,
|
||||||
|
number: NumberFor<B>,
|
||||||
|
finality_proof: Vec<u8>,
|
||||||
|
) -> Result<(B::Hash, NumberFor<B>), ()> {
|
||||||
|
let result = finality_proof_import.as_ref().map(|finality_proof_import| {
|
||||||
|
finality_proof_import.import_finality_proof(hash, number, finality_proof, verifier)
|
||||||
|
.map_err(|e| {
|
||||||
|
debug!(
|
||||||
|
"Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}",
|
||||||
|
e,
|
||||||
|
hash,
|
||||||
|
number,
|
||||||
|
who,
|
||||||
|
);
|
||||||
|
})
|
||||||
|
}).unwrap_or(Err(()));
|
||||||
|
|
||||||
|
trace!(target: "sync", "Imported finality proof for {}/{}", number, hash);
|
||||||
|
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process result of block(s) import.
|
||||||
|
fn process_import_results<B: BlockT>(
|
||||||
|
link: &Link<B>,
|
||||||
|
results: Vec<(
|
||||||
|
Result<BlockImportResult<NumberFor<B>>, BlockImportError>,
|
||||||
|
B::Hash,
|
||||||
|
)>,
|
||||||
|
)
|
||||||
|
{
|
||||||
|
let mut has_error = false;
|
||||||
|
let mut hashes = vec![];
|
||||||
|
for (result, hash) in results {
|
||||||
|
hashes.push(hash);
|
||||||
|
|
||||||
|
if has_error {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if result.is_err() {
|
||||||
|
has_error = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number),
|
||||||
|
Ok(BlockImportResult::ImportedUnknown(number, aux, who)) => {
|
||||||
|
link.block_imported(&hash, number);
|
||||||
|
|
||||||
|
if aux.clear_justification_requests {
|
||||||
|
trace!(target: "sync", "Block imported clears all pending justification requests {}: {:?}", number, hash);
|
||||||
|
link.clear_justification_requests();
|
||||||
|
}
|
||||||
|
|
||||||
|
if aux.needs_justification {
|
||||||
|
trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash);
|
||||||
|
link.request_justification(&hash, number);
|
||||||
|
}
|
||||||
|
|
||||||
|
if aux.bad_justification {
|
||||||
|
if let Some(peer) = who {
|
||||||
|
info!("Sent block with bad justification to import");
|
||||||
|
link.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if aux.needs_finality_proof {
|
||||||
|
trace!(target: "sync", "Block imported but requires finality proof {}: {:?}", number, hash);
|
||||||
|
link.request_finality_proof(&hash, number);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(BlockImportError::IncompleteHeader(who)) => {
|
||||||
|
if let Some(peer) = who {
|
||||||
|
info!("Peer sent block with incomplete header to import");
|
||||||
|
link.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE);
|
||||||
|
link.restart();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(BlockImportError::VerificationFailed(who, e)) => {
|
||||||
|
if let Some(peer) = who {
|
||||||
|
info!("Verification failed from peer: {}", e);
|
||||||
|
link.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE);
|
||||||
|
link.restart();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(BlockImportError::BadBlock(who)) => {
|
||||||
|
if let Some(peer) = who {
|
||||||
|
info!("Bad block");
|
||||||
|
link.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE);
|
||||||
|
link.restart();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => {
|
||||||
|
link.restart();
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
link.blocks_processed(hashes, has_error);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Import several blocks at once, returning import result for each block.
|
||||||
|
fn import_many_blocks<B: BlockT, V: Verifier<B>>(
|
||||||
|
import_handle: &BlockImport<B, Error = ConsensusError>,
|
||||||
|
blocks_origin: BlockOrigin,
|
||||||
|
blocks: Vec<IncomingBlock<B>>,
|
||||||
|
verifier: Arc<V>,
|
||||||
|
) -> (usize, usize, Vec<(
|
||||||
|
Result<BlockImportResult<NumberFor<B>>, BlockImportError>,
|
||||||
|
B::Hash,
|
||||||
|
)>) {
|
||||||
|
let count = blocks.len();
|
||||||
|
let mut imported = 0;
|
||||||
|
|
||||||
|
let blocks_range = match (
|
||||||
|
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
||||||
|
blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
|
||||||
|
) {
|
||||||
|
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
|
||||||
|
(Some(first), Some(_)) => format!(" ({})", first),
|
||||||
|
_ => Default::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
trace!(target: "sync", "Starting import of {} blocks {}", count, blocks_range);
|
||||||
|
|
||||||
|
let mut results = vec![];
|
||||||
|
|
||||||
|
let mut has_error = false;
|
||||||
|
|
||||||
|
// Blocks in the response/drain should be in ascending order.
|
||||||
|
for block in blocks {
|
||||||
|
let block_hash = block.hash;
|
||||||
|
let import_result = if has_error {
|
||||||
|
Err(BlockImportError::Error)
|
||||||
|
} else {
|
||||||
|
import_single_block(
|
||||||
|
import_handle,
|
||||||
|
blocks_origin.clone(),
|
||||||
|
block,
|
||||||
|
verifier.clone(),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
let was_ok = import_result.is_ok();
|
||||||
|
results.push((import_result, block_hash));
|
||||||
|
if was_ok {
|
||||||
|
imported += 1;
|
||||||
|
} else {
|
||||||
|
has_error = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
(imported, count, results)
|
||||||
|
}
|
||||||
|
|
||||||
/// Single block import function.
|
/// Single block import function.
|
||||||
pub fn import_single_block<B: BlockT, V: Verifier<B>>(
|
pub fn import_single_block<B: BlockT, V: Verifier<B>>(
|
||||||
import_handle: &BlockImport<B, Error = ConsensusError>,
|
import_handle: &BlockImport<B, Error = ConsensusError>,
|
||||||
@@ -909,12 +1082,19 @@ mod tests {
|
|||||||
)).unwrap();
|
)).unwrap();
|
||||||
|
|
||||||
// Wait until this request is redirected to the BlockImportWorker
|
// Wait until this request is redirected to the BlockImportWorker
|
||||||
assert_eq!(worker_receiver.recv(), Ok(BlockImportWorkerMsg::ImportFinalityProof(
|
match worker_receiver.recv().unwrap() {
|
||||||
who.clone(),
|
BlockImportWorkerMsg::ImportFinalityProof(
|
||||||
Default::default(),
|
cwho,
|
||||||
1,
|
chash,
|
||||||
vec![42],
|
1,
|
||||||
)));
|
cproof,
|
||||||
|
) => {
|
||||||
|
assert_eq!(cwho, who);
|
||||||
|
assert_eq!(chash, Default::default());
|
||||||
|
assert_eq!(cproof, vec![42]);
|
||||||
|
},
|
||||||
|
_ => unreachable!("Unexpected work request received"),
|
||||||
|
}
|
||||||
|
|
||||||
// Send ack of proof import from BlockImportWorker to BlockImporter
|
// Send ack of proof import from BlockImportWorker to BlockImporter
|
||||||
result_sender.send(BlockImportWorkerMsg::ImportedFinalityProof(
|
result_sender.send(BlockImportWorkerMsg::ImportedFinalityProof(
|
||||||
|
|||||||
@@ -211,6 +211,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
|||||||
let (network_chan, network_port) = mpsc::unbounded();
|
let (network_chan, network_port) = mpsc::unbounded();
|
||||||
let (protocol_sender, protocol_rx) = mpsc::unbounded();
|
let (protocol_sender, protocol_rx) = mpsc::unbounded();
|
||||||
let status_sinks = Arc::new(Mutex::new(Vec::new()));
|
let status_sinks = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
|
||||||
|
// connect the import-queue to the network service.
|
||||||
|
let link = NetworkLink {
|
||||||
|
protocol_sender: protocol_sender.clone(),
|
||||||
|
network_sender: network_chan.clone(),
|
||||||
|
};
|
||||||
|
import_queue.start(Box::new(link))?;
|
||||||
|
|
||||||
// Start in off-line mode, since we're not connected to any nodes yet.
|
// Start in off-line mode, since we're not connected to any nodes yet.
|
||||||
let is_offline = Arc::new(AtomicBool::new(true));
|
let is_offline = Arc::new(AtomicBool::new(true));
|
||||||
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
let is_major_syncing = Arc::new(AtomicBool::new(false));
|
||||||
@@ -229,7 +237,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
|||||||
is_major_syncing.clone(),
|
is_major_syncing.clone(),
|
||||||
protocol,
|
protocol,
|
||||||
peers.clone(),
|
peers.clone(),
|
||||||
import_queue.clone(),
|
import_queue,
|
||||||
params.transaction_pool,
|
params.transaction_pool,
|
||||||
params.finality_proof_provider,
|
params.finality_proof_provider,
|
||||||
network_port,
|
network_port,
|
||||||
@@ -244,22 +252,14 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> {
|
|||||||
status_sinks,
|
status_sinks,
|
||||||
is_offline,
|
is_offline,
|
||||||
is_major_syncing,
|
is_major_syncing,
|
||||||
network_chan: network_chan.clone(),
|
network_chan,
|
||||||
peers,
|
peers,
|
||||||
peerset,
|
peerset,
|
||||||
network,
|
network,
|
||||||
protocol_sender: protocol_sender.clone(),
|
protocol_sender,
|
||||||
bg_thread: Some(thread),
|
bg_thread: Some(thread),
|
||||||
});
|
});
|
||||||
|
|
||||||
// connect the import-queue to the network service.
|
|
||||||
let link = NetworkLink {
|
|
||||||
protocol_sender,
|
|
||||||
network_sender: network_chan,
|
|
||||||
};
|
|
||||||
|
|
||||||
import_queue.start(Box::new(link))?;
|
|
||||||
|
|
||||||
Ok(service)
|
Ok(service)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user