mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-17 12:31:03 +00:00
Remove the exit parameter from importing and exporting blocks (#3954)
* export_blocks now generates a Future * Handle the export blocks exit in substrate-cli instead * Turn import_blocks entirely into a Future * Move the exit parameter to substrate-cli for imports
This commit is contained in:
committed by
Bastian Köcher
parent
75a0a3f6fd
commit
f9a4cff50a
@@ -62,7 +62,7 @@ use app_dirs::{AppInfo, AppDataType};
|
|||||||
use log::info;
|
use log::info;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
use futures::Future;
|
use futures::{Async, Future};
|
||||||
use substrate_telemetry::TelemetryEndpoints;
|
use substrate_telemetry::TelemetryEndpoints;
|
||||||
|
|
||||||
/// default sub directory to store network config
|
/// default sub directory to store network config
|
||||||
@@ -388,7 +388,25 @@ impl<'a> ParseAndPrepareExport<'a> {
|
|||||||
None => Box::new(stdout()),
|
None => Box::new(stdout()),
|
||||||
};
|
};
|
||||||
|
|
||||||
builder(config)?.export_blocks(exit.into_exit(), file, from.into(), to.map(Into::into), json)?;
|
// Note: while we would like the user to handle the exit themselves, we handle it here
|
||||||
|
// for backwards compatibility reasons.
|
||||||
|
let (exit_send, exit_recv) = std::sync::mpsc::channel();
|
||||||
|
let exit = exit.into_exit();
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let _ = exit.wait();
|
||||||
|
let _ = exit_send.send(());
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut export_fut = builder(config)?.export_blocks(file, from.into(), to.map(Into::into), json);
|
||||||
|
let fut = futures::future::poll_fn(|| {
|
||||||
|
if exit_recv.try_recv().is_ok() {
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
export_fut.poll()
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
|
||||||
|
runtime.block_on(fut)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -423,7 +441,7 @@ impl<'a> ParseAndPrepareImport<'a> {
|
|||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
let file: Box<dyn ReadPlusSeek> = match self.params.input {
|
let file: Box<dyn ReadPlusSeek + Send> = match self.params.input {
|
||||||
Some(filename) => Box::new(File::open(filename)?),
|
Some(filename) => Box::new(File::open(filename)?),
|
||||||
None => {
|
None => {
|
||||||
let mut buffer = Vec::new();
|
let mut buffer = Vec::new();
|
||||||
@@ -432,8 +450,25 @@ impl<'a> ParseAndPrepareImport<'a> {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
let fut = builder(config)?.import_blocks(exit.into_exit(), file)?;
|
// Note: while we would like the user to handle the exit themselves, we handle it here
|
||||||
tokio::run(fut);
|
// for backwards compatibility reasons.
|
||||||
|
let (exit_send, exit_recv) = std::sync::mpsc::channel();
|
||||||
|
let exit = exit.into_exit();
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let _ = exit.wait();
|
||||||
|
let _ = exit_send.send(());
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut import_fut = builder(config)?.import_blocks(file);
|
||||||
|
let fut = futures::future::poll_fn(|| {
|
||||||
|
if exit_recv.try_recv().is_ok() {
|
||||||
|
return Ok(Async::Ready(()));
|
||||||
|
}
|
||||||
|
import_fut.poll()
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap();
|
||||||
|
runtime.block_on(fut)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -636,9 +636,8 @@ pub trait ServiceBuilderImport {
|
|||||||
/// Starts the process of importing blocks.
|
/// Starts the process of importing blocks.
|
||||||
fn import_blocks(
|
fn import_blocks(
|
||||||
self,
|
self,
|
||||||
exit: impl Future<Item=(),Error=()> + Send + 'static,
|
input: impl Read + Seek + Send + 'static,
|
||||||
input: impl Read + Seek,
|
) -> Box<dyn Future<Item = (), Error = Error> + Send>;
|
||||||
) -> Result<Box<dyn Future<Item = (), Error = ()> + Send>, Error>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Implemented on `ServiceBuilder`. Allows exporting blocks once you have given all the required
|
/// Implemented on `ServiceBuilder`. Allows exporting blocks once you have given all the required
|
||||||
@@ -649,13 +648,12 @@ pub trait ServiceBuilderExport {
|
|||||||
|
|
||||||
/// Performs the blocks export.
|
/// Performs the blocks export.
|
||||||
fn export_blocks(
|
fn export_blocks(
|
||||||
&self,
|
self,
|
||||||
exit: impl Future<Item=(),Error=()> + Send + 'static,
|
output: impl Write + 'static,
|
||||||
output: impl Write,
|
|
||||||
from: NumberFor<Self::Block>,
|
from: NumberFor<Self::Block>,
|
||||||
to: Option<NumberFor<Self::Block>>,
|
to: Option<NumberFor<Self::Block>>,
|
||||||
json: bool
|
json: bool
|
||||||
) -> Result<(), Error>;
|
) -> Box<dyn Future<Item = (), Error = Error>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Implemented on `ServiceBuilder`. Allows reverting the chain once you have given all the
|
/// Implemented on `ServiceBuilder`. Allows reverting the chain once you have given all the
|
||||||
@@ -687,13 +685,11 @@ impl<
|
|||||||
{
|
{
|
||||||
fn import_blocks(
|
fn import_blocks(
|
||||||
self,
|
self,
|
||||||
exit: impl Future<Item=(),Error=()> + Send + 'static,
|
input: impl Read + Seek + Send + 'static,
|
||||||
input: impl Read + Seek,
|
) -> Box<dyn Future<Item = (), Error = Error> + Send> {
|
||||||
) -> Result<Box<dyn Future<Item = (), Error = ()> + Send>, Error> {
|
|
||||||
let client = self.client;
|
let client = self.client;
|
||||||
let mut queue = self.import_queue;
|
let mut queue = self.import_queue;
|
||||||
import_blocks!(TBl, client, queue, exit, input)
|
Box::new(import_blocks!(TBl, client, queue, input).compat())
|
||||||
.map(|f| Box::new(f) as Box<_>)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -703,20 +699,20 @@ impl<TBl, TRtApi, TCfg, TGen, TCSExt, TBackend, TExec, TFchr, TSc, TImpQu, TFprb
|
|||||||
where
|
where
|
||||||
TBl: BlockT<Hash = <Blake2Hasher as Hasher>::Out>,
|
TBl: BlockT<Hash = <Blake2Hasher as Hasher>::Out>,
|
||||||
TBackend: 'static + client_api::backend::Backend<TBl, Blake2Hasher> + Send,
|
TBackend: 'static + client_api::backend::Backend<TBl, Blake2Hasher> + Send,
|
||||||
TExec: 'static + client::CallExecutor<TBl, Blake2Hasher> + Send + Sync + Clone
|
TExec: 'static + client::CallExecutor<TBl, Blake2Hasher> + Send + Sync + Clone,
|
||||||
|
TRtApi: 'static + Send + Sync,
|
||||||
{
|
{
|
||||||
type Block = TBl;
|
type Block = TBl;
|
||||||
|
|
||||||
fn export_blocks(
|
fn export_blocks(
|
||||||
&self,
|
self,
|
||||||
exit: impl Future<Item=(),Error=()> + Send + 'static,
|
mut output: impl Write + 'static,
|
||||||
mut output: impl Write,
|
|
||||||
from: NumberFor<TBl>,
|
from: NumberFor<TBl>,
|
||||||
to: Option<NumberFor<TBl>>,
|
to: Option<NumberFor<TBl>>,
|
||||||
json: bool
|
json: bool
|
||||||
) -> Result<(), Error> {
|
) -> Box<dyn Future<Item = (), Error = Error>> {
|
||||||
let client = &self.client;
|
let client = self.client;
|
||||||
export_blocks!(client, exit, output, from, to, json)
|
Box::new(export_blocks!(client, output, from, to, json).compat())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ use chain_spec::{ChainSpec, RuntimeGenesis, Extension};
|
|||||||
#[macro_export]
|
#[macro_export]
|
||||||
/// Export blocks
|
/// Export blocks
|
||||||
macro_rules! export_blocks {
|
macro_rules! export_blocks {
|
||||||
($client:ident, $exit:ident, $output:ident, $from:ident, $to:ident, $json:ident) => {{
|
($client:ident, $output:ident, $from:ident, $to:ident, $json:ident) => {{
|
||||||
let mut block = $from;
|
let mut block = $from;
|
||||||
|
|
||||||
let last = match $to {
|
let last = match $to {
|
||||||
@@ -32,27 +32,31 @@ macro_rules! export_blocks {
|
|||||||
None => $client.info().chain.best_number,
|
None => $client.info().chain.best_number,
|
||||||
};
|
};
|
||||||
|
|
||||||
if last < block {
|
let mut wrote_header = false;
|
||||||
return Err("Invalid block range specified".into());
|
|
||||||
}
|
|
||||||
|
|
||||||
let (exit_send, exit_recv) = std::sync::mpsc::channel();
|
// Exporting blocks is implemented as a future, because we want the operation to be
|
||||||
std::thread::spawn(move || {
|
// interruptible.
|
||||||
let _ = $exit.wait();
|
//
|
||||||
let _ = exit_send.send(());
|
// Every time we write a block to the output, the `Future` re-schedules itself and returns
|
||||||
});
|
// `Poll::Pending`.
|
||||||
info!("Exporting blocks from #{} to #{}", block, last);
|
// This makes it possible either to interleave other operations in-between the block exports,
|
||||||
if !$json {
|
// or to stop the operation completely.
|
||||||
let last_: u64 = last.saturated_into::<u64>();
|
futures03::future::poll_fn(move |cx| {
|
||||||
let block_: u64 = block.saturated_into::<u64>();
|
if last < block {
|
||||||
let len: u64 = last_ - block_ + 1;
|
return std::task::Poll::Ready(Err("Invalid block range specified".into()));
|
||||||
$output.write_all(&len.encode())?;
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if exit_recv.try_recv().is_ok() {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !wrote_header {
|
||||||
|
info!("Exporting blocks from #{} to #{}", block, last);
|
||||||
|
if !$json {
|
||||||
|
let last_: u64 = last.saturated_into::<u64>();
|
||||||
|
let block_: u64 = block.saturated_into::<u64>();
|
||||||
|
let len: u64 = last_ - block_ + 1;
|
||||||
|
$output.write_all(&len.encode())?;
|
||||||
|
}
|
||||||
|
wrote_header = true;
|
||||||
|
}
|
||||||
|
|
||||||
match $client.block(&BlockId::number(block))? {
|
match $client.block(&BlockId::number(block))? {
|
||||||
Some(block) => {
|
Some(block) => {
|
||||||
if $json {
|
if $json {
|
||||||
@@ -62,17 +66,21 @@ macro_rules! export_blocks {
|
|||||||
$output.write_all(&block.encode())?;
|
$output.write_all(&block.encode())?;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
None => break,
|
// Reached end of the chain.
|
||||||
|
None => return std::task::Poll::Ready(Ok(())),
|
||||||
}
|
}
|
||||||
if (block % 10000.into()).is_zero() {
|
if (block % 10000.into()).is_zero() {
|
||||||
info!("#{}", block);
|
info!("#{}", block);
|
||||||
}
|
}
|
||||||
if block == last {
|
if block == last {
|
||||||
break;
|
return std::task::Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
block += One::one();
|
block += One::one();
|
||||||
}
|
|
||||||
Ok(())
|
// Re-schedule the task in order to continue the operation.
|
||||||
|
cx.waker().wake_by_ref();
|
||||||
|
std::task::Poll::Pending
|
||||||
|
})
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,13 +88,12 @@ macro_rules! export_blocks {
|
|||||||
#[macro_export]
|
#[macro_export]
|
||||||
/// Import blocks
|
/// Import blocks
|
||||||
macro_rules! import_blocks {
|
macro_rules! import_blocks {
|
||||||
($block:ty, $client:ident, $queue:ident, $exit:ident, $input:ident) => {{
|
($block:ty, $client:ident, $queue:ident, $input:ident) => {{
|
||||||
use consensus_common::import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult};
|
use consensus_common::import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult};
|
||||||
use consensus_common::BlockOrigin;
|
use consensus_common::BlockOrigin;
|
||||||
use network::message;
|
use network::message;
|
||||||
use sr_primitives::generic::SignedBlock;
|
use sr_primitives::generic::SignedBlock;
|
||||||
use sr_primitives::traits::Block;
|
use sr_primitives::traits::Block;
|
||||||
use futures03::TryFutureExt as _;
|
|
||||||
|
|
||||||
struct WaitLink {
|
struct WaitLink {
|
||||||
imported_blocks: u64,
|
imported_blocks: u64,
|
||||||
@@ -121,75 +128,88 @@ macro_rules! import_blocks {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let (exit_send, exit_recv) = std::sync::mpsc::channel();
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
let _ = $exit.wait();
|
|
||||||
let _ = exit_send.send(());
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut io_reader_input = IoReader($input);
|
let mut io_reader_input = IoReader($input);
|
||||||
let count: u64 = Decode::decode(&mut io_reader_input)
|
let mut count = None::<u64>;
|
||||||
.map_err(|e| format!("Error reading file: {}", e))?;
|
let mut read_block_count = 0;
|
||||||
info!("Importing {} blocks", count);
|
|
||||||
let mut block_count = 0;
|
|
||||||
for b in 0 .. count {
|
|
||||||
if exit_recv.try_recv().is_ok() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
match SignedBlock::<$block>::decode(&mut io_reader_input) {
|
|
||||||
Ok(signed) => {
|
|
||||||
let (header, extrinsics) = signed.block.deconstruct();
|
|
||||||
let hash = header.hash();
|
|
||||||
let block = message::BlockData::<$block> {
|
|
||||||
hash,
|
|
||||||
justification: signed.justification,
|
|
||||||
header: Some(header),
|
|
||||||
body: Some(extrinsics),
|
|
||||||
receipt: None,
|
|
||||||
message_queue: None
|
|
||||||
};
|
|
||||||
// import queue handles verification and importing it into the client
|
|
||||||
$queue.import_blocks(BlockOrigin::File, vec![
|
|
||||||
IncomingBlock::<$block> {
|
|
||||||
hash: block.hash,
|
|
||||||
header: block.header,
|
|
||||||
body: block.body,
|
|
||||||
justification: block.justification,
|
|
||||||
origin: None,
|
|
||||||
allow_missing_state: false,
|
|
||||||
}
|
|
||||||
]);
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error reading block data at {}: {}", b, e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
block_count = b;
|
|
||||||
if b % 1000 == 0 && b != 0 {
|
|
||||||
info!("#{} blocks were added to the queue", b);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut link = WaitLink::new();
|
let mut link = WaitLink::new();
|
||||||
Ok(futures::future::poll_fn(move || {
|
|
||||||
if exit_recv.try_recv().is_ok() {
|
// Importing blocks is implemented as a future, because we want the operation to be
|
||||||
return Ok(Async::Ready(()));
|
// interruptible.
|
||||||
|
//
|
||||||
|
// Every time we read a block from the input or import a bunch of blocks from the import
|
||||||
|
// queue, the `Future` re-schedules itself and returns `Poll::Pending`.
|
||||||
|
// This makes it possible either to interleave other operations in-between the block imports,
|
||||||
|
// or to stop the operation completely.
|
||||||
|
futures03::future::poll_fn(move |cx| {
|
||||||
|
// Start by reading the number of blocks if not done so already.
|
||||||
|
let count = match count {
|
||||||
|
Some(c) => c,
|
||||||
|
None => {
|
||||||
|
let c: u64 = match Decode::decode(&mut io_reader_input) {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(err) => {
|
||||||
|
let err = format!("Error reading file: {}", err);
|
||||||
|
return std::task::Poll::Ready(Err(From::from(err)));
|
||||||
|
},
|
||||||
|
};
|
||||||
|
info!("Importing {} blocks", c);
|
||||||
|
count = Some(c);
|
||||||
|
c
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Read blocks from the input.
|
||||||
|
if read_block_count < count {
|
||||||
|
match SignedBlock::<$block>::decode(&mut io_reader_input) {
|
||||||
|
Ok(signed) => {
|
||||||
|
let (header, extrinsics) = signed.block.deconstruct();
|
||||||
|
let hash = header.hash();
|
||||||
|
let block = message::BlockData::<$block> {
|
||||||
|
hash,
|
||||||
|
justification: signed.justification,
|
||||||
|
header: Some(header),
|
||||||
|
body: Some(extrinsics),
|
||||||
|
receipt: None,
|
||||||
|
message_queue: None
|
||||||
|
};
|
||||||
|
// import queue handles verification and importing it into the client
|
||||||
|
$queue.import_blocks(BlockOrigin::File, vec![
|
||||||
|
IncomingBlock::<$block> {
|
||||||
|
hash: block.hash,
|
||||||
|
header: block.header,
|
||||||
|
body: block.body,
|
||||||
|
justification: block.justification,
|
||||||
|
origin: None,
|
||||||
|
allow_missing_state: false,
|
||||||
|
}
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error reading block data at {}: {}", read_block_count, e);
|
||||||
|
return std::task::Poll::Ready(Ok(()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
read_block_count += 1;
|
||||||
|
if read_block_count % 1000 == 0 {
|
||||||
|
info!("#{} blocks were added to the queue", read_block_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
cx.waker().wake_by_ref();
|
||||||
|
return std::task::Poll::Pending;
|
||||||
}
|
}
|
||||||
|
|
||||||
let blocks_before = link.imported_blocks;
|
let blocks_before = link.imported_blocks;
|
||||||
let _ = futures03::future::poll_fn(|cx| {
|
$queue.poll_actions(cx, &mut link);
|
||||||
$queue.poll_actions(cx, &mut link);
|
|
||||||
std::task::Poll::Pending::<Result<(), ()>>
|
|
||||||
}).compat().poll();
|
|
||||||
if link.has_error {
|
if link.has_error {
|
||||||
info!(
|
info!(
|
||||||
"Stopping after #{} blocks because of an error",
|
"Stopping after #{} blocks because of an error",
|
||||||
link.imported_blocks,
|
link.imported_blocks,
|
||||||
);
|
);
|
||||||
return Ok(Async::Ready(()));
|
return std::task::Poll::Ready(Ok(()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if link.imported_blocks / 1000 != blocks_before / 1000 {
|
if link.imported_blocks / 1000 != blocks_before / 1000 {
|
||||||
info!(
|
info!(
|
||||||
"#{} blocks were imported (#{} left)",
|
"#{} blocks were imported (#{} left)",
|
||||||
@@ -197,13 +217,16 @@ macro_rules! import_blocks {
|
|||||||
count - link.imported_blocks
|
count - link.imported_blocks
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if link.imported_blocks >= count {
|
if link.imported_blocks >= count {
|
||||||
info!("Imported {} blocks. Best: #{}", block_count, $client.info().chain.best_number);
|
info!("Imported {} blocks. Best: #{}", read_block_count, $client.info().chain.best_number);
|
||||||
Ok(Async::Ready(()))
|
return std::task::Poll::Ready(Ok(()));
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
Ok(Async::NotReady)
|
// Polling the import queue will re-schedule the task when ready.
|
||||||
|
return std::task::Poll::Pending;
|
||||||
}
|
}
|
||||||
}))
|
})
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user