diff --git a/substrate/client/cli/src/lib.rs b/substrate/client/cli/src/lib.rs index 1209518cba..696a6eaeae 100644 --- a/substrate/client/cli/src/lib.rs +++ b/substrate/client/cli/src/lib.rs @@ -62,7 +62,7 @@ use app_dirs::{AppInfo, AppDataType}; use log::info; use lazy_static::lazy_static; -use futures::Future; +use futures::{Async, Future}; use substrate_telemetry::TelemetryEndpoints; /// default sub directory to store network config @@ -388,7 +388,25 @@ impl<'a> ParseAndPrepareExport<'a> { None => Box::new(stdout()), }; - builder(config)?.export_blocks(exit.into_exit(), file, from.into(), to.map(Into::into), json)?; + // Note: while we would like the user to handle the exit themselves, we handle it here + // for backwards compatibility reasons. + let (exit_send, exit_recv) = std::sync::mpsc::channel(); + let exit = exit.into_exit(); + std::thread::spawn(move || { + let _ = exit.wait(); + let _ = exit_send.send(()); + }); + + let mut export_fut = builder(config)?.export_blocks(file, from.into(), to.map(Into::into), json); + let fut = futures::future::poll_fn(|| { + if exit_recv.try_recv().is_ok() { + return Ok(Async::Ready(())); + } + export_fut.poll() + }); + + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(fut)?; Ok(()) } } @@ -423,7 +441,7 @@ impl<'a> ParseAndPrepareImport<'a> { ..Default::default() }; - let file: Box = match self.params.input { + let file: Box = match self.params.input { Some(filename) => Box::new(File::open(filename)?), None => { let mut buffer = Vec::new(); @@ -432,8 +450,25 @@ impl<'a> ParseAndPrepareImport<'a> { }, }; - let fut = builder(config)?.import_blocks(exit.into_exit(), file)?; - tokio::run(fut); + // Note: while we would like the user to handle the exit themselves, we handle it here + // for backwards compatibility reasons. + let (exit_send, exit_recv) = std::sync::mpsc::channel(); + let exit = exit.into_exit(); + std::thread::spawn(move || { + let _ = exit.wait(); + let _ = exit_send.send(()); + }); + + let mut import_fut = builder(config)?.import_blocks(file); + let fut = futures::future::poll_fn(|| { + if exit_recv.try_recv().is_ok() { + return Ok(Async::Ready(())); + } + import_fut.poll() + }); + + let mut runtime = tokio::runtime::current_thread::Runtime::new().unwrap(); + runtime.block_on(fut)?; Ok(()) } } diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 679b11c224..9d2b1ebb8f 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -636,9 +636,8 @@ pub trait ServiceBuilderImport { /// Starts the process of importing blocks. fn import_blocks( self, - exit: impl Future + Send + 'static, - input: impl Read + Seek, - ) -> Result + Send>, Error>; + input: impl Read + Seek + Send + 'static, + ) -> Box + Send>; } /// Implemented on `ServiceBuilder`. Allows exporting blocks once you have given all the required @@ -649,13 +648,12 @@ pub trait ServiceBuilderExport { /// Performs the blocks export. fn export_blocks( - &self, - exit: impl Future + Send + 'static, - output: impl Write, + self, + output: impl Write + 'static, from: NumberFor, to: Option>, json: bool - ) -> Result<(), Error>; + ) -> Box>; } /// Implemented on `ServiceBuilder`. Allows reverting the chain once you have given all the @@ -687,13 +685,11 @@ impl< { fn import_blocks( self, - exit: impl Future + Send + 'static, - input: impl Read + Seek, - ) -> Result + Send>, Error> { + input: impl Read + Seek + Send + 'static, + ) -> Box + Send> { let client = self.client; let mut queue = self.import_queue; - import_blocks!(TBl, client, queue, exit, input) - .map(|f| Box::new(f) as Box<_>) + Box::new(import_blocks!(TBl, client, queue, input).compat()) } } @@ -703,20 +699,20 @@ impl::Out>, TBackend: 'static + client_api::backend::Backend + Send, - TExec: 'static + client::CallExecutor + Send + Sync + Clone + TExec: 'static + client::CallExecutor + Send + Sync + Clone, + TRtApi: 'static + Send + Sync, { type Block = TBl; fn export_blocks( - &self, - exit: impl Future + Send + 'static, - mut output: impl Write, + self, + mut output: impl Write + 'static, from: NumberFor, to: Option>, json: bool - ) -> Result<(), Error> { - let client = &self.client; - export_blocks!(client, exit, output, from, to, json) + ) -> Box> { + let client = self.client; + Box::new(export_blocks!(client, output, from, to, json).compat()) } } diff --git a/substrate/client/service/src/chain_ops.rs b/substrate/client/service/src/chain_ops.rs index 15d0189381..2ccd120006 100644 --- a/substrate/client/service/src/chain_ops.rs +++ b/substrate/client/service/src/chain_ops.rs @@ -23,7 +23,7 @@ use chain_spec::{ChainSpec, RuntimeGenesis, Extension}; #[macro_export] /// Export blocks macro_rules! export_blocks { -($client:ident, $exit:ident, $output:ident, $from:ident, $to:ident, $json:ident) => {{ +($client:ident, $output:ident, $from:ident, $to:ident, $json:ident) => {{ let mut block = $from; let last = match $to { @@ -32,27 +32,31 @@ macro_rules! export_blocks { None => $client.info().chain.best_number, }; - if last < block { - return Err("Invalid block range specified".into()); - } + let mut wrote_header = false; - let (exit_send, exit_recv) = std::sync::mpsc::channel(); - std::thread::spawn(move || { - let _ = $exit.wait(); - let _ = exit_send.send(()); - }); - info!("Exporting blocks from #{} to #{}", block, last); - if !$json { - let last_: u64 = last.saturated_into::(); - let block_: u64 = block.saturated_into::(); - let len: u64 = last_ - block_ + 1; - $output.write_all(&len.encode())?; - } - - loop { - if exit_recv.try_recv().is_ok() { - break; + // Exporting blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we write a block to the output, the `Future` re-schedules itself and returns + // `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block exports, + // or to stop the operation completely. + futures03::future::poll_fn(move |cx| { + if last < block { + return std::task::Poll::Ready(Err("Invalid block range specified".into())); } + + if !wrote_header { + info!("Exporting blocks from #{} to #{}", block, last); + if !$json { + let last_: u64 = last.saturated_into::(); + let block_: u64 = block.saturated_into::(); + let len: u64 = last_ - block_ + 1; + $output.write_all(&len.encode())?; + } + wrote_header = true; + } + match $client.block(&BlockId::number(block))? { Some(block) => { if $json { @@ -62,17 +66,21 @@ macro_rules! export_blocks { $output.write_all(&block.encode())?; } }, - None => break, + // Reached end of the chain. + None => return std::task::Poll::Ready(Ok(())), } if (block % 10000.into()).is_zero() { info!("#{}", block); } if block == last { - break; + return std::task::Poll::Ready(Ok(())); } block += One::one(); - } - Ok(()) + + // Re-schedule the task in order to continue the operation. + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }) }} } @@ -80,13 +88,12 @@ macro_rules! export_blocks { #[macro_export] /// Import blocks macro_rules! import_blocks { -($block:ty, $client:ident, $queue:ident, $exit:ident, $input:ident) => {{ +($block:ty, $client:ident, $queue:ident, $input:ident) => {{ use consensus_common::import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult}; use consensus_common::BlockOrigin; use network::message; use sr_primitives::generic::SignedBlock; use sr_primitives::traits::Block; - use futures03::TryFutureExt as _; struct WaitLink { imported_blocks: u64, @@ -121,75 +128,88 @@ macro_rules! import_blocks { } } - let (exit_send, exit_recv) = std::sync::mpsc::channel(); - std::thread::spawn(move || { - let _ = $exit.wait(); - let _ = exit_send.send(()); - }); - let mut io_reader_input = IoReader($input); - let count: u64 = Decode::decode(&mut io_reader_input) - .map_err(|e| format!("Error reading file: {}", e))?; - info!("Importing {} blocks", count); - let mut block_count = 0; - for b in 0 .. count { - if exit_recv.try_recv().is_ok() { - break; - } - match SignedBlock::<$block>::decode(&mut io_reader_input) { - Ok(signed) => { - let (header, extrinsics) = signed.block.deconstruct(); - let hash = header.hash(); - let block = message::BlockData::<$block> { - hash, - justification: signed.justification, - header: Some(header), - body: Some(extrinsics), - receipt: None, - message_queue: None - }; - // import queue handles verification and importing it into the client - $queue.import_blocks(BlockOrigin::File, vec![ - IncomingBlock::<$block> { - hash: block.hash, - header: block.header, - body: block.body, - justification: block.justification, - origin: None, - allow_missing_state: false, - } - ]); - } - Err(e) => { - warn!("Error reading block data at {}: {}", b, e); - break; - } - } - - block_count = b; - if b % 1000 == 0 && b != 0 { - info!("#{} blocks were added to the queue", b); - } - } - + let mut count = None::; + let mut read_block_count = 0; let mut link = WaitLink::new(); - Ok(futures::future::poll_fn(move || { - if exit_recv.try_recv().is_ok() { - return Ok(Async::Ready(())); + + // Importing blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we read a block from the input or import a bunch of blocks from the import + // queue, the `Future` re-schedules itself and returns `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block imports, + // or to stop the operation completely. + futures03::future::poll_fn(move |cx| { + // Start by reading the number of blocks if not done so already. + let count = match count { + Some(c) => c, + None => { + let c: u64 = match Decode::decode(&mut io_reader_input) { + Ok(c) => c, + Err(err) => { + let err = format!("Error reading file: {}", err); + return std::task::Poll::Ready(Err(From::from(err))); + }, + }; + info!("Importing {} blocks", c); + count = Some(c); + c + } + }; + + // Read blocks from the input. + if read_block_count < count { + match SignedBlock::<$block>::decode(&mut io_reader_input) { + Ok(signed) => { + let (header, extrinsics) = signed.block.deconstruct(); + let hash = header.hash(); + let block = message::BlockData::<$block> { + hash, + justification: signed.justification, + header: Some(header), + body: Some(extrinsics), + receipt: None, + message_queue: None + }; + // import queue handles verification and importing it into the client + $queue.import_blocks(BlockOrigin::File, vec![ + IncomingBlock::<$block> { + hash: block.hash, + header: block.header, + body: block.body, + justification: block.justification, + origin: None, + allow_missing_state: false, + } + ]); + } + Err(e) => { + warn!("Error reading block data at {}: {}", read_block_count, e); + return std::task::Poll::Ready(Ok(())); + } + } + + read_block_count += 1; + if read_block_count % 1000 == 0 { + info!("#{} blocks were added to the queue", read_block_count); + } + + cx.waker().wake_by_ref(); + return std::task::Poll::Pending; } let blocks_before = link.imported_blocks; - let _ = futures03::future::poll_fn(|cx| { - $queue.poll_actions(cx, &mut link); - std::task::Poll::Pending::> - }).compat().poll(); + $queue.poll_actions(cx, &mut link); + if link.has_error { info!( "Stopping after #{} blocks because of an error", link.imported_blocks, ); - return Ok(Async::Ready(())); + return std::task::Poll::Ready(Ok(())); } + if link.imported_blocks / 1000 != blocks_before / 1000 { info!( "#{} blocks were imported (#{} left)", @@ -197,13 +217,16 @@ macro_rules! import_blocks { count - link.imported_blocks ); } + if link.imported_blocks >= count { - info!("Imported {} blocks. Best: #{}", block_count, $client.info().chain.best_number); - Ok(Async::Ready(())) + info!("Imported {} blocks. Best: #{}", read_block_count, $client.info().chain.best_number); + return std::task::Poll::Ready(Ok(())); + } else { - Ok(Async::NotReady) + // Polling the import queue will re-schedule the task when ready. + return std::task::Poll::Pending; } - })) + }) }} }