Fix --import-blocks (#1807)

* Encode count of exported blocks correctly

There was a type mismatch: import used u32, export
used Number.

* Wait for import to finish

The issue was that even though the import thread
was still running, the main thread exited.

* Remove superfluous parentheses

* Improve structure, add proofs for expects

* Unify types for export/import length
This commit is contained in:
Michael Müller
2019-02-15 15:59:53 +01:00
committed by Sergei Pepyakin
parent 3cbf2bc9c4
commit c122e8eee8
+37 -7
View File
@@ -21,7 +21,7 @@ use futures::Future;
use log::{info, warn};
use runtime_primitives::generic::{SignedBlock, BlockId};
use runtime_primitives::traits::{As, Block, Header};
use runtime_primitives::traits::{As, Block, Header, NumberFor};
use consensus_common::import_queue::{ImportQueue, IncomingBlock, Link};
use network::message;
@@ -66,7 +66,10 @@ pub fn export_blocks<F, E, W>(
});
info!("Exporting blocks from #{} to #{}", block, last);
if !json {
output.write(&(last - block + As::sa(1)).encode())?;
let last_: u64 = last.as_();
let block_: u64 = block.as_();
let len: u64 = last_ - block_ + 1;
output.write(&len.encode())?;
}
loop {
@@ -95,6 +98,25 @@ pub fn export_blocks<F, E, W>(
Ok(())
}
struct WaitLink {
wait_send: std::sync::mpsc::Sender<()>,
}
impl WaitLink {
fn new(wait_send: std::sync::mpsc::Sender<()>) -> WaitLink {
WaitLink {
wait_send,
}
}
}
impl<B: Block> Link<B> for WaitLink {
fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) {
self.wait_send.send(())
.expect("Unable to notify main process; if the main process panicked then this thread would already be dead as well. qed.");
}
}
/// Import blocks from a binary stream.
pub fn import_blocks<F, E, R>(
mut config: FactoryFullConfiguration<F>,
@@ -103,13 +125,13 @@ pub fn import_blocks<F, E, R>(
) -> error::Result<()>
where F: ServiceFactory, E: Future<Item=(),Error=()> + Send + 'static, R: Read,
{
struct DummyLink;
impl<B: Block> Link<B> for DummyLink { }
let client = new_client::<F>(&config)?;
// FIXME #1134 this shouldn't need a mutable config.
let queue = components::FullComponents::<F>::build_import_queue(&mut config, client.clone())?;
queue.start(DummyLink)?;
let (wait_send, wait_recv) = std::sync::mpsc::channel();
let wait_link = WaitLink::new(wait_send);
queue.start(wait_link)?;
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
@@ -117,7 +139,7 @@ pub fn import_blocks<F, E, R>(
let _ = exit_send.send(());
});
let count: u32 = Decode::decode(&mut input).ok_or("Error reading file")?;
let count: u64 = Decode::decode(&mut input).ok_or("Error reading file")?;
info!("Importing {} blocks", count);
let mut block_count = 0;
for b in 0 .. count {
@@ -155,6 +177,14 @@ pub fn import_blocks<F, E, R>(
info!("#{}", b);
}
}
let mut blocks_imported = 0;
while blocks_imported < count {
wait_recv.recv()
.expect("Importing thread has panicked. Then the main process will die before this can be reached. qed.");
blocks_imported += 1;
}
info!("Imported {} blocks. Best: #{}", block_count, client.info()?.chain.best_number);
Ok(())