Remove ServiceBuilderCommand and implement the chain ops as standalone functions instead. (#6543)

* :)

* Slight tidy

* Remove ServiceBuilderCommand

* Remove whitespace

* Keep task manager alive for check_block/import_blocks

* Pass task_manager to run_until_exit

* Make task_manager in run_until_exit and make subcommands async

* Change the async_run fn to return a future and task manager

* async_run should take a result fn

* Apply suggestions from code review

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>

* Fix spaces in export_raw_state

Co-authored-by: Bastian Köcher <bkchr@users.noreply.github.com>
This commit is contained in:
Ashley
2020-07-02 12:57:56 +02:00
committed by GitHub
parent 424d5c722d
commit 5f751e4472
21 changed files with 960 additions and 805 deletions
+51 -50
View File
@@ -195,7 +195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d0864d84b8e07b145449be9a8537db86bf9de5ce03b913214694643b4743502"
dependencies = [
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -1040,7 +1040,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47c5e5ac752e18207b12e16b10631ae5f7f68f8805f335f9b817ead83d9ffce1"
dependencies = [
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -1080,7 +1080,7 @@ checksum = "e2323f3f47db9a0e77ce7a300605d8d2098597fc451ed1a97bb1f6411bb550a7"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -1182,7 +1182,7 @@ checksum = "2ed9afacaea0301eefb738c9deea725e6d53938004597cdc518a8cf9a7aa2f03"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -1335,7 +1335,7 @@ checksum = "030a733c8287d6213886dd487564ff5c8f6aae10278b3588ed177f9d18f8d231"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"synstructure",
]
@@ -1526,7 +1526,7 @@ dependencies = [
"frame-support-procedural-tools",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -1537,7 +1537,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -1546,7 +1546,7 @@ version = "2.0.0-rc4"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -1763,7 +1763,7 @@ dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -2309,7 +2309,7 @@ checksum = "7ef5550a42e3740a0e71f909d4c861056a284060af885ae7aa6242820f920d9d"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -2448,7 +2448,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -2740,7 +2740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f09548626b737ed64080fde595e06ce1117795b8b9fc4d2629fa36561c583171"
dependencies = [
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -4635,7 +4635,7 @@ dependencies = [
"proc-macro2",
"quote 1.0.6",
"sp-runtime",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -4869,7 +4869,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -4919,7 +4919,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f557c32c6d268a07c921471619c0295f5efad3a0e76d4f97a05c091a51d110b2"
dependencies = [
"proc-macro2",
"syn 1.0.17",
"syn 1.0.33",
"synstructure",
]
@@ -5007,7 +5007,7 @@ dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -5071,7 +5071,7 @@ checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -5210,7 +5210,7 @@ dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"version_check",
]
@@ -5222,7 +5222,7 @@ checksum = "4f5444ead4e9935abd7f27dc51f7e852a0569ac888096d5ec2499470794e2e53"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"syn-mid",
"version_check",
]
@@ -5241,9 +5241,9 @@ checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694"
[[package]]
name = "proc-macro2"
version = "1.0.10"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3"
checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa"
dependencies = [
"unicode-xid 0.2.0",
]
@@ -5315,7 +5315,7 @@ dependencies = [
"itertools 0.8.2",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -5683,7 +5683,7 @@ checksum = "602eb59cda66fcb9aec25841fb76bc01d2b34282dcdd705028da297db6f3eec8"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -5763,7 +5763,7 @@ checksum = "475e68978dc5b743f2f40d8e0a8fdc83f1c5e78cbf4b8fa5e74e73beebc340de"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -5888,7 +5888,7 @@ checksum = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -6021,7 +6021,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -6048,6 +6048,7 @@ dependencies = [
"sc-service",
"sc-telemetry",
"sc-tracing",
"serde",
"serde_json",
"sp-blockchain",
"sp-core",
@@ -7076,7 +7077,7 @@ checksum = "f8584eea9b9ff42825b46faf46a8c24d2cff13ec152fa2a50df788b87c07ee28"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -7166,22 +7167,22 @@ checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0"
[[package]]
name = "serde"
version = "1.0.110"
version = "1.0.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c"
checksum = "5317f7588f0a5078ee60ef675ef96735a1442132dc645eb1d12c018620ed8cd3"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.110"
version = "1.0.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984"
checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -7295,7 +7296,7 @@ checksum = "a945ec7f7ce853e89ffa36be1e27dce9a43e82ff9093bf3461c30d5da74ed11b"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -7393,7 +7394,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -7657,7 +7658,7 @@ version = "2.0.0-rc4"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -7756,7 +7757,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -7848,7 +7849,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -8149,7 +8150,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -8170,7 +8171,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -8538,9 +8539,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.17"
version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03"
checksum = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd"
dependencies = [
"proc-macro2",
"quote 1.0.6",
@@ -8555,7 +8556,7 @@ checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -8575,7 +8576,7 @@ checksum = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"unicode-xid 0.2.0",
]
@@ -8638,7 +8639,7 @@ dependencies = [
"lazy_static",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"version_check",
]
@@ -8668,7 +8669,7 @@ checksum = "ca972988113b7715266f91250ddb98070d033c62a011fa0fcc57434a649310dd"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -8874,7 +8875,7 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -9075,7 +9076,7 @@ checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
]
[[package]]
@@ -9427,7 +9428,7 @@ dependencies = [
"log",
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"wasm-bindgen-shared",
]
@@ -9461,7 +9462,7 @@ checksum = "8eb197bd3a47553334907ffd2f16507b4f4f01bbec3ac921a7719e0decdfe72a"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@@ -9782,7 +9783,7 @@ checksum = "de251eec69fc7c1bc3923403d18ececb929380e016afe103da75f396704f8ca2"
dependencies = [
"proc-macro2",
"quote 1.0.6",
"syn 1.0.17",
"syn 1.0.33",
"synstructure",
]
@@ -71,7 +71,10 @@ pub fn run() -> sc_cli::Result<()> {
match &cli.subcommand {
Some(subcommand) => {
let runner = cli.create_runner(subcommand)?;
runner.run_subcommand(subcommand, |config| Ok(new_full_start!(config).0))
runner.run_subcommand(subcommand, |config| {
let (builder, _, _) = new_full_start!(config);
Ok(builder.to_chain_ops_parts())
})
}
None => {
let runner = cli.create_runner(&cli.run)?;
+4 -2
View File
@@ -97,8 +97,10 @@ pub fn run() -> Result<()> {
}
Some(Subcommand::Base(subcommand)) => {
let runner = cli.create_runner(subcommand)?;
runner.run_subcommand(subcommand, |config| Ok(new_full_start!(config).0))
runner.run_subcommand(subcommand, |config| {
let (builder, _, _, _) = new_full_start!(config);
Ok(builder.to_chain_ops_parts())
})
}
}
}
+1
View File
@@ -43,6 +43,7 @@ structopt = "0.3.8"
sc-tracing = { version = "2.0.0-rc4", path = "../tracing" }
chrono = "0.4.10"
parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] }
serde = "1.0.111"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
rpassword = "4.0.1"
@@ -22,7 +22,7 @@ use crate::params::SharedParams;
use crate::CliConfiguration;
use log::info;
use sc_network::config::build_multiaddr;
use sc_service::{config::MultiaddrWithPeerId, Configuration};
use sc_service::{config::{MultiaddrWithPeerId, NetworkConfiguration}, ChainSpec};
use structopt::StructOpt;
use std::io::Write;
@@ -51,13 +51,16 @@ pub struct BuildSpecCmd {
impl BuildSpecCmd {
/// Run the build-spec command
pub fn run(&self, config: Configuration) -> error::Result<()> {
pub fn run(
&self,
mut spec: Box<dyn ChainSpec>,
network_config: NetworkConfiguration,
) -> error::Result<()> {
info!("Building chain spec");
let mut spec = config.chain_spec;
let raw_output = self.raw;
if spec.boot_nodes().is_empty() && !self.disable_default_bootnode {
let keys = config.network.node_key.into_keypair()?;
let keys = network_config.node_key.into_keypair()?;
let peer_id = keys.public().into_peer_id();
let addr = MultiaddrWithPeerId {
multiaddr: build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(30333u16)],
@@ -19,9 +19,9 @@
use crate::{
CliConfiguration, error, params::{ImportParams, SharedParams, BlockNumberOrHash},
};
use sc_service::{Configuration, ServiceBuilderCommand};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{fmt::Debug, str::FromStr};
use sc_client_api::{BlockBackend, UsageProvider};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::{fmt::Debug, str::FromStr, sync::Arc};
use structopt::StructOpt;
/// The `check-block` command used to validate blocks.
@@ -48,21 +48,21 @@ pub struct CheckBlockCmd {
impl CheckBlockCmd {
/// Run the check-block command
pub async fn run<B, BC, BB>(
pub async fn run<B, C, IQ>(
&self,
config: Configuration,
builder: B,
client: Arc<C>,
import_queue: IQ,
) -> error::Result<()>
where
B: FnOnce(Configuration) -> Result<BC, sc_service::error::Error>,
BC: ServiceBuilderCommand<Block = BB> + Unpin,
BB: BlockT + Debug,
<NumberFor<BB> as FromStr>::Err: std::fmt::Debug,
BB::Hash: FromStr,
<BB::Hash as FromStr>::Err: std::fmt::Debug,
B: BlockT + for<'de> serde::Deserialize<'de>,
C: BlockBackend<B> + UsageProvider<B> + Send + Sync + 'static,
IQ: sc_service::ImportQueue<B> + 'static,
B::Hash: FromStr,
<B::Hash as FromStr>::Err: Debug,
<<B::Header as HeaderT>::Number as FromStr>::Err: Debug,
{
let start = std::time::Instant::now();
builder(config)?.check_block(self.input.parse()?).await?;
sc_service::chain_ops::check_block(client, import_queue, self.input.parse()?).await?;
println!("Completed in {} ms.", start.elapsed().as_millis());
Ok(())
@@ -21,13 +21,16 @@ use crate::params::{BlockNumber, DatabaseParams, PruningParams, SharedParams};
use crate::CliConfiguration;
use log::info;
use sc_service::{
config::DatabaseConfig, Configuration, ServiceBuilderCommand,
config::DatabaseConfig, chain_ops::export_blocks,
};
use sc_client_api::{BlockBackend, UsageProvider};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::fmt::Debug;
use std::fs;
use std::io;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use structopt::StructOpt;
/// The `export-blocks` command used to export blocks.
@@ -68,19 +71,17 @@ pub struct ExportBlocksCmd {
impl ExportBlocksCmd {
/// Run the export-blocks command
pub async fn run<B, BC, BB>(
pub async fn run<B, C>(
&self,
config: Configuration,
builder: B,
client: Arc<C>,
database_config: DatabaseConfig,
) -> error::Result<()>
where
B: FnOnce(Configuration) -> Result<BC, sc_service::error::Error>,
BC: ServiceBuilderCommand<Block = BB> + Unpin,
BB: sp_runtime::traits::Block + Debug,
<<<BB as BlockT>::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug,
<BB as BlockT>::Hash: std::str::FromStr,
B: BlockT,
C: BlockBackend<B> + UsageProvider<B> + 'static,
<<B::Header as HeaderT>::Number as FromStr>::Err: Debug,
{
if let DatabaseConfig::RocksDb { ref path, .. } = &config.database {
if let DatabaseConfig::RocksDb { ref path, .. } = database_config {
info!("DB path: {}", path.display());
}
@@ -94,8 +95,7 @@ impl ExportBlocksCmd {
None => Box::new(io::stdout()),
};
builder(config)?
.export_blocks(file, from.into(), to, binary)
export_blocks(client, file, from.into(), to, binary)
.await
.map_err(Into::into)
}
@@ -20,10 +20,10 @@ use crate::{
CliConfiguration, error, params::{PruningParams, SharedParams, BlockNumberOrHash},
};
use log::info;
use sc_service::{Configuration, ServiceBuilderCommand};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{fmt::Debug, str::FromStr, io::Write};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::{fmt::Debug, str::FromStr, io::Write, sync::Arc};
use structopt::StructOpt;
use sc_client_api::{StorageProvider, UsageProvider};
/// The `export-state` command used to export the state of a given block into
/// a chain spec.
@@ -44,23 +44,22 @@ pub struct ExportStateCmd {
impl ExportStateCmd {
/// Run the `export-state` command
pub fn run<B, BC, BB>(
pub async fn run<B, BA, C>(
&self,
config: Configuration,
builder: B,
client: Arc<C>,
mut input_spec: Box<dyn sc_service::ChainSpec>,
) -> error::Result<()>
where
B: FnOnce(Configuration) -> Result<BC, sc_service::error::Error>,
BC: ServiceBuilderCommand<Block = BB> + Unpin,
BB: BlockT + Debug,
<NumberFor<BB> as FromStr>::Err: std::fmt::Debug,
BB::Hash: FromStr,
<BB::Hash as FromStr>::Err: std::fmt::Debug,
B: BlockT,
C: UsageProvider<B> + StorageProvider<B, BA>,
BA: sc_client_api::backend::Backend<B>,
B::Hash: FromStr,
<B::Hash as FromStr>::Err: Debug,
<<B::Header as HeaderT>::Number as FromStr>::Err: Debug,
{
info!("Exporting raw state...");
let mut input_spec = config.chain_spec.cloned_box();
let block_id = self.input.as_ref().map(|b| b.parse()).transpose()?;
let raw_state = builder(config)?.export_raw_state(block_id)?;
let raw_state = sc_service::chain_ops::export_raw_state(client, block_id)?;
input_spec.set_storage(raw_state);
info!("Generating new chain spec...");
@@ -20,13 +20,15 @@ use crate::error;
use crate::params::ImportParams;
use crate::params::SharedParams;
use crate::CliConfiguration;
use sc_service::{Configuration, ServiceBuilderCommand};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sc_service::chain_ops::import_blocks;
use sp_runtime::traits::Block as BlockT;
use std::fmt::Debug;
use std::fs;
use std::io::{self, Read, Seek};
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;
use sc_client_api::UsageProvider;
/// The `import-blocks` command used to import blocks.
#[derive(Debug, StructOpt)]
@@ -61,17 +63,15 @@ impl<T: Read + Seek> ReadPlusSeek for T {}
impl ImportBlocksCmd {
/// Run the import-blocks command
pub async fn run<B, BC, BB>(
pub async fn run<B, C, IQ>(
&self,
config: Configuration,
builder: B,
client: Arc<C>,
import_queue: IQ,
) -> error::Result<()>
where
B: FnOnce(Configuration) -> Result<BC, sc_service::error::Error>,
BC: ServiceBuilderCommand<Block = BB> + Unpin,
BB: sp_runtime::traits::Block + Debug,
<<<BB as BlockT>::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug,
<BB as BlockT>::Hash: std::str::FromStr,
C: UsageProvider<B> + Send + Sync + 'static,
B: BlockT + for<'de> serde::Deserialize<'de>,
IQ: sc_service::ImportQueue<B> + 'static,
{
let file: Box<dyn ReadPlusSeek + Send> = match &self.input {
Some(filename) => Box::new(fs::File::open(filename)?),
@@ -82,8 +82,7 @@ impl ImportBlocksCmd {
}
};
builder(config)?
.import_blocks(file, false, self.binary)
import_blocks(client, import_queue, file, false, self.binary)
.await
.map_err(Into::into)
}
@@ -19,7 +19,7 @@
use crate::error;
use crate::params::{DatabaseParams, SharedParams};
use crate::CliConfiguration;
use sc_service::Configuration;
use sc_service::DatabaseConfig;
use std::fmt::Debug;
use std::fs;
use std::io::{self, Write};
@@ -43,8 +43,8 @@ pub struct PurgeChainCmd {
impl PurgeChainCmd {
/// Run the purge command
pub fn run(&self, config: Configuration) -> error::Result<()> {
let db_path = config.database.path()
pub fn run(&self, database_config: DatabaseConfig) -> error::Result<()> {
let db_path = database_config.path()
.ok_or_else(||
error::Error::Input("Cannot purge custom database implementation".into())
)?;
@@ -19,10 +19,13 @@
use crate::error;
use crate::params::{BlockNumber, PruningParams, SharedParams};
use crate::CliConfiguration;
use sc_service::{Configuration, ServiceBuilderCommand};
use sc_service::chain_ops::revert_chain;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;
use structopt::StructOpt;
use sc_client_api::{Backend, UsageProvider};
/// The `revert` command used revert the chain to a previous state.
#[derive(Debug, StructOpt)]
@@ -42,16 +45,19 @@ pub struct RevertCmd {
impl RevertCmd {
/// Run the revert command
pub fn run<B, BC, BB>(&self, config: Configuration, builder: B) -> error::Result<()>
pub async fn run<B, BA, C>(
&self,
client: Arc<C>,
backend: Arc<BA>,
) -> error::Result<()>
where
B: FnOnce(Configuration) -> Result<BC, sc_service::error::Error>,
BC: ServiceBuilderCommand<Block = BB> + Unpin,
BB: sp_runtime::traits::Block + Debug,
<<<BB as BlockT>::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug,
<BB as BlockT>::Hash: std::str::FromStr,
B: BlockT,
BA: Backend<B>,
C: UsageProvider<B>,
<<<B as BlockT>::Header as HeaderT>::Number as FromStr>::Err: Debug,
{
let blocks = self.num.parse()?;
builder(config)?.revert_chain(blocks)?;
revert_chain(client, backend, blocks)?;
Ok(())
}
+48 -19
View File
@@ -25,10 +25,11 @@ use futures::pin_mut;
use futures::select;
use futures::{future, future::FutureExt, Future};
use log::info;
use sc_service::{Configuration, ServiceBuilderCommand, TaskType, TaskManager};
use sc_service::{Configuration, TaskType, TaskManager};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL};
use std::{fmt::Debug, marker::PhantomData, str::FromStr};
use std::{fmt::Debug, marker::PhantomData, str::FromStr, sync::Arc};
use sc_client_api::{UsageProvider, BlockBackend, StorageProvider};
#[cfg(target_family = "unix")]
async fn main<F, E>(func: F) -> std::result::Result<(), Box<dyn std::error::Error>>
@@ -92,7 +93,11 @@ pub fn build_runtime() -> std::result::Result<tokio::runtime::Runtime, std::io::
.build()
}
fn run_until_exit<FUT, ERR>(mut tokio_runtime: tokio::runtime::Runtime, future: FUT) -> Result<()>
fn run_until_exit<FUT, ERR>(
mut tokio_runtime: tokio::runtime::Runtime,
future: FUT,
mut task_manager: TaskManager,
) -> Result<()>
where
FUT: Future<Output = std::result::Result<(), ERR>> + future::Future,
ERR: 'static + std::error::Error,
@@ -102,6 +107,9 @@ where
tokio_runtime.block_on(main(f)).map_err(|e| e.to_string())?;
task_manager.terminate();
drop(tokio_runtime);
Ok(())
}
@@ -173,29 +181,47 @@ impl<C: SubstrateCli> Runner<C> {
/// A helper function that runs a future with tokio and stops if the process receives the signal
/// `SIGTERM` or `SIGINT`.
pub fn run_subcommand<B, BC, BB>(self, subcommand: &Subcommand, builder: B) -> Result<()>
pub fn run_subcommand<BU, B, BA, IQ, CL>(self, subcommand: &Subcommand, builder: BU)
-> Result<()>
where
B: FnOnce(Configuration) -> sc_service::error::Result<BC>,
BC: ServiceBuilderCommand<Block = BB> + Unpin,
BB: sp_runtime::traits::Block + Debug,
<<<BB as BlockT>::Header as HeaderT>::Number as FromStr>::Err: Debug,
<BB as BlockT>::Hash: FromStr,
<<BB as BlockT>::Hash as FromStr>::Err: Debug,
BU: FnOnce(Configuration)
-> sc_service::error::Result<(Arc<CL>, Arc<BA>, IQ, TaskManager)>,
B: BlockT + for<'de> serde::Deserialize<'de>,
BA: sc_client_api::backend::Backend<B> + 'static,
IQ: sc_service::ImportQueue<B> + 'static,
<B as BlockT>::Hash: FromStr,
<<B as BlockT>::Hash as FromStr>::Err: Debug,
<<<B as BlockT>::Header as HeaderT>::Number as FromStr>::Err: Debug,
CL: UsageProvider<B> + BlockBackend<B> + StorageProvider<B, BA> + Send + Sync +
'static,
{
let chain_spec = self.config.chain_spec.cloned_box();
let network_config = self.config.network.clone();
let db_config = self.config.database.clone();
match subcommand {
Subcommand::BuildSpec(cmd) => cmd.run(self.config),
Subcommand::BuildSpec(cmd) => cmd.run(chain_spec, network_config),
Subcommand::ExportBlocks(cmd) => {
run_until_exit(self.tokio_runtime, cmd.run(self.config, builder))
let (client, _, _, task_manager) = builder(self.config)?;
run_until_exit(self.tokio_runtime, cmd.run(client, db_config), task_manager)
}
Subcommand::ImportBlocks(cmd) => {
run_until_exit(self.tokio_runtime, cmd.run(self.config, builder))
let (client, _, import_queue, task_manager) = builder(self.config)?;
run_until_exit(self.tokio_runtime, cmd.run(client, import_queue), task_manager)
}
Subcommand::CheckBlock(cmd) => {
run_until_exit(self.tokio_runtime, cmd.run(self.config, builder))
let (client, _, import_queue, task_manager) = builder(self.config)?;
run_until_exit(self.tokio_runtime, cmd.run(client, import_queue), task_manager)
}
Subcommand::Revert(cmd) => cmd.run(self.config, builder),
Subcommand::PurgeChain(cmd) => cmd.run(self.config),
Subcommand::ExportState(cmd) => cmd.run(self.config, builder),
Subcommand::Revert(cmd) => {
let (client, backend, _, task_manager) = builder(self.config)?;
run_until_exit(self.tokio_runtime, cmd.run(client, backend), task_manager)
},
Subcommand::PurgeChain(cmd) => cmd.run(db_config),
Subcommand::ExportState(cmd) => {
let (client, _, _, task_manager) = builder(self.config)?;
run_until_exit(self.tokio_runtime, cmd.run(client, chain_spec), task_manager)
},
}
}
@@ -221,11 +247,14 @@ impl<C: SubstrateCli> Runner<C> {
/// A helper function that runs a future with tokio and stops if the process receives
/// the signal SIGTERM or SIGINT
pub fn async_run<FUT>(self, runner: impl FnOnce(Configuration) -> FUT) -> Result<()>
pub fn async_run<FUT>(
self, runner: impl FnOnce(Configuration) -> Result<(FUT, TaskManager)>,
) -> Result<()>
where
FUT: Future<Output = Result<()>>,
{
run_until_exit(self.tokio_runtime, runner(self.config))
let (future, task_manager) = runner(self.config)?;
run_until_exit(self.tokio_runtime, future, task_manager)
}
/// Get an immutable reference to the node Configuration
+7 -51
View File
@@ -45,15 +45,11 @@ use sc_network::NetworkService;
use parking_lot::{Mutex, RwLock};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, NumberFor, SaturatedConversion, HashFor, Zero, BlockIdTo,
Block as BlockT, SaturatedConversion, HashFor, Zero, BlockIdTo,
};
use sp_api::{ProvideRuntimeApi, CallApiAt};
use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo};
use std::{
collections::HashMap,
io::{Read, Write, Seek},
marker::PhantomData, sync::Arc, pin::Pin
};
use std::{collections::HashMap, marker::PhantomData, sync::Arc, pin::Pin};
use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{LocalTransactionPool, MaintainedTransactionPool};
@@ -67,7 +63,6 @@ use sc_client_api::{
proof_provider::ProofProvider,
execution_extensions::ExecutionExtensions
};
use sp_core::storage::Storage;
use sp_blockchain::{HeaderMetadata, HeaderBackend};
use crate::{ServiceComponents, TelemetryOnConnectSinks, RpcHandlers, NetworkStatusSinks};
@@ -523,6 +518,11 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
self.remote_backend.clone()
}
/// Consume the builder and return the parts needed for chain operations.
pub fn to_chain_ops_parts(self) -> (Arc<TCl>, Arc<Backend>, TImpQu, TaskManager) {
(self.client, self.backend, self.import_queue, self.task_manager)
}
/// Defines which head-of-chain strategy to use.
pub fn with_opt_select_chain<USc>(
self,
@@ -840,50 +840,6 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
}
}
/// Implemented on `ServiceBuilder`. Allows running block commands, such as import/export/validate
/// components to the builder.
pub trait ServiceBuilderCommand {
/// Block type this API operates on.
type Block: BlockT;
/// Native execution dispatch required by some commands.
type NativeDispatch: NativeExecutionDispatch + 'static;
/// Starts the process of importing blocks.
fn import_blocks(
self,
input: impl Read + Seek + Send + 'static,
force: bool,
binary: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
/// Performs the blocks export.
fn export_blocks(
self,
output: impl Write + 'static,
from: NumberFor<Self::Block>,
to: Option<NumberFor<Self::Block>>,
binary: bool
) -> Pin<Box<dyn Future<Output = Result<(), Error>>>>;
/// Performs a revert of `blocks` blocks.
fn revert_chain(
&self,
blocks: NumberFor<Self::Block>
) -> Result<(), Error>;
/// Re-validate known block.
fn check_block(
self,
block: BlockId<Self::Block>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
/// Export the raw state at the given `block`. If `block` is `None`, the
/// best block will be used.
fn export_raw_state(
&self,
block: Option<BlockId<Self::Block>>,
) -> Result<Storage, Error>;
}
impl<TBl, TRtApi, TBackend, TSc, TImpQu, TExPool, TRpc, TCl>
ServiceBuilder<
TBl,
-614
View File
@@ -1,614 +0,0 @@
// This file is part of Substrate.
// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Chain utilities.
use crate::error;
use crate::builder::{ServiceBuilderCommand, ServiceBuilder};
use crate::error::Error;
use sc_chain_spec::ChainSpec;
use log::{warn, info};
use futures::{future, prelude::*};
use sp_runtime::traits::{
Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion, MaybeSerializeDeserialize,
};
use sp_runtime::generic::{BlockId, SignedBlock};
use codec::{Decode, Encode, IoReader as CodecIoReader};
use crate::client::{Client, LocalCallExecutor};
use sp_consensus::{
BlockOrigin,
import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue},
};
use sc_executor::{NativeExecutor, NativeExecutionDispatch};
use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageChild, StorageMap};
use sc_client_api::{StorageProvider, BlockBackend, UsageProvider};
use std::{io::{Read, Write, Seek}, pin::Pin, collections::HashMap};
use std::time::{Duration, Instant};
use futures_timer::Delay;
use std::task::Poll;
use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer};
use std::convert::{TryFrom, TryInto};
use sp_runtime::traits::{CheckedDiv, Saturating};
/// Number of blocks we will add to the queue before waiting for the queue to catch up.
const MAX_PENDING_BLOCKS: u64 = 1_024;
/// Number of milliseconds to wait until next poll.
const DELAY_TIME: u64 = 2_000;
/// Number of milliseconds that must have passed between two updates.
const TIME_BETWEEN_UPDATES: u64 = 3_000;
/// Build a chain spec json
pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result<String> {
spec.as_json(raw).map_err(Into::into)
}
/// Helper enum that wraps either a binary decoder (from parity-scale-codec), or a JSON decoder (from serde_json).
/// Implements the Iterator Trait, calling `next()` will decode the next SignedBlock and return it.
enum BlockIter<R, B> where
R: std::io::Read + std::io::Seek,
{
Binary {
// Total number of blocks we are expecting to decode.
num_expected_blocks: u64,
// Number of blocks we have decoded thus far.
read_block_count: u64,
// Reader to the data, used for decoding new blocks.
reader: CodecIoReader<R>,
},
Json {
// Nubmer of blocks we have decoded thus far.
read_block_count: u64,
// Stream to the data, used for decoding new blocks.
reader: StreamDeserializer<'static, JsonIoRead<R>, SignedBlock<B>>,
},
}
impl<R, B> BlockIter<R, B> where
R: Read + Seek + 'static,
B: BlockT + MaybeSerializeDeserialize,
{
fn new(input: R, binary: bool) -> Result<Self, String> {
if binary {
let mut reader = CodecIoReader(input);
// If the file is encoded in binary format, it is expected to first specify the number
// of blocks that are going to be decoded. We read it and add it to our enum struct.
let num_expected_blocks: u64 = Decode::decode(&mut reader)
.map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?;
Ok(BlockIter::Binary {
num_expected_blocks,
read_block_count: 0,
reader,
})
} else {
let stream_deser = Deserializer::from_reader(input)
.into_iter::<SignedBlock<B>>();
Ok(BlockIter::Json {
reader: stream_deser,
read_block_count: 0,
})
}
}
/// Returns the number of blocks read thus far.
fn read_block_count(&self) -> u64 {
match self {
BlockIter::Binary { read_block_count, .. }
| BlockIter::Json { read_block_count, .. }
=> *read_block_count,
}
}
/// Returns the total number of blocks to be imported, if possible.
fn num_expected_blocks(&self) -> Option<u64> {
match self {
BlockIter::Binary { num_expected_blocks, ..} => Some(*num_expected_blocks),
BlockIter::Json {..} => None
}
}
}
impl<R, B> Iterator for BlockIter<R, B> where
R: Read + Seek + 'static,
B: BlockT + MaybeSerializeDeserialize,
{
type Item = Result<SignedBlock<B>, String>;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::Binary { num_expected_blocks, read_block_count, reader } => {
if read_block_count < num_expected_blocks {
let block_result: Result<SignedBlock::<B>, _> = SignedBlock::<B>::decode(reader)
.map_err(|e| e.to_string());
*read_block_count += 1;
Some(block_result)
} else {
// `read_block_count` == `num_expected_blocks` so we've read enough blocks.
None
}
}
BlockIter::Json { reader, read_block_count } => {
let res = Some(reader.next()?.map_err(|e| e.to_string()));
*read_block_count += 1;
res
}
}
}
}
/// Imports the SignedBlock to the queue.
fn import_block_to_queue<TBl, TImpQu>(
signed_block: SignedBlock<TBl>,
queue: &mut TImpQu,
force: bool
) where
TBl: BlockT + MaybeSerializeDeserialize,
TImpQu: 'static + ImportQueue<TBl>,
{
let (header, extrinsics) = signed_block.block.deconstruct();
let hash = header.hash();
// import queue handles verification and importing it into the client.
queue.import_blocks(BlockOrigin::File, vec![
IncomingBlock::<TBl> {
hash,
header: Some(header),
body: Some(extrinsics),
justification: signed_block.justification,
origin: None,
allow_missing_state: false,
import_existing: force,
}
]);
}
/// Returns true if we have imported every block we were supposed to import, else returns false.
fn importing_is_done(
num_expected_blocks: Option<u64>,
read_block_count: u64,
imported_blocks: u64
) -> bool {
if let Some(num_expected_blocks) = num_expected_blocks {
imported_blocks >= num_expected_blocks
} else {
imported_blocks >= read_block_count
}
}
/// Structure used to log the block importing speed.
struct Speedometer<B: BlockT> {
best_number: NumberFor<B>,
last_number: Option<NumberFor<B>>,
last_update: Instant,
}
impl<B: BlockT> Speedometer<B> {
/// Creates a fresh Speedometer.
fn new() -> Self {
Self {
best_number: NumberFor::<B>::from(0),
last_number: None,
last_update: Instant::now(),
}
}
/// Calculates `(best_number - last_number) / (now - last_update)` and
/// logs the speed of import.
fn display_speed(&self) {
// Number of milliseconds elapsed since last time.
let elapsed_ms = {
let elapsed = self.last_update.elapsed();
let since_last_millis = elapsed.as_secs() * 1000;
let since_last_subsec_millis = elapsed.subsec_millis() as u64;
since_last_millis + since_last_subsec_millis
};
// Number of blocks that have been imported since last time.
let diff = match self.last_number {
None => return,
Some(n) => self.best_number.saturating_sub(n)
};
if let Ok(diff) = TryInto::<u128>::try_into(diff) {
// If the number of blocks can be converted to a regular integer, then it's easy: just
// do the math and turn it into a `f64`.
let speed = diff.saturating_mul(10_000).checked_div(u128::from(elapsed_ms))
.map_or(0.0, |s| s as f64) / 10.0;
info!("📦 Current best block: {} ({:4.1} bps)", self.best_number, speed);
} else {
// If the number of blocks can't be converted to a regular integer, then we need a more
// algebraic approach and we stay within the realm of integers.
let one_thousand = NumberFor::<B>::from(1_000);
let elapsed = NumberFor::<B>::from(
<u32 as TryFrom<_>>::try_from(elapsed_ms).unwrap_or(u32::max_value())
);
let speed = diff.saturating_mul(one_thousand).checked_div(&elapsed)
.unwrap_or_else(Zero::zero);
info!("📦 Current best block: {} ({} bps)", self.best_number, speed)
}
}
/// Updates the Speedometer.
fn update(&mut self, best_number: NumberFor<B>) {
self.last_number = Some(self.best_number);
self.best_number = best_number;
self.last_update = Instant::now();
}
// If more than TIME_BETWEEN_UPDATES has elapsed since last update,
// then print and update the speedometer.
fn notify_user(&mut self, best_number: NumberFor<B>) {
let delta = Duration::from_millis(TIME_BETWEEN_UPDATES);
if Instant::now().duration_since(self.last_update) >= delta {
self.display_speed();
self.update(best_number);
}
}
}
/// Different State that the `import_blocks` future could be in.
enum ImportState<R, B> where
R: Read + Seek + 'static,
B: BlockT + MaybeSerializeDeserialize,
{
/// We are reading from the BlockIter structure, adding those blocks to the queue if possible.
Reading{block_iter: BlockIter<R, B>},
/// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to catch up.
WaitingForImportQueueToCatchUp{
block_iter: BlockIter<R, B>,
delay: Delay,
block: SignedBlock<B>
},
// We have added all the blocks to the queue but they are still being processed.
WaitingForImportQueueToFinish{
num_expected_blocks: Option<u64>,
read_block_count: u64,
delay: Delay,
},
}
impl<
TBl, TRtApi, TBackend,
TExecDisp, TFchr, TSc, TImpQu, TFprb, TFpp,
TExPool, TRpc, Backend
> ServiceBuilderCommand for ServiceBuilder<
TBl, TRtApi,
Client<TBackend, LocalCallExecutor<TBackend, NativeExecutor<TExecDisp>>, TBl, TRtApi>,
TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend
> where
TBl: BlockT + MaybeSerializeDeserialize,
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
TExecDisp: 'static + NativeExecutionDispatch,
TImpQu: 'static + ImportQueue<TBl>,
TRtApi: 'static + Send + Sync,
Self: Send + 'static,
{
type Block = TBl;
type NativeDispatch = TExecDisp;
fn import_blocks(
mut self,
input: impl Read + Seek + Send + 'static,
force: bool,
binary: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
struct WaitLink {
imported_blocks: u64,
has_error: bool,
}
impl WaitLink {
fn new() -> WaitLink {
WaitLink {
imported_blocks: 0,
has_error: false,
}
}
}
impl<B: BlockT> Link<B> for WaitLink {
fn blocks_processed(
&mut self,
imported: usize,
_num_expected_blocks: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
self.imported_blocks += imported as u64;
for result in results {
if let (Err(err), hash) = result {
warn!("There was an error importing block with hash {:?}: {:?}", hash, err);
self.has_error = true;
break;
}
}
}
}
let mut link = WaitLink::new();
let block_iter_res: Result<BlockIter<_, Self::Block>, String> = BlockIter::new(input, binary);
let block_iter = match block_iter_res {
Ok(block_iter) => block_iter,
Err(e) => {
// We've encountered an error while creating the block iterator
// so we can just return a future that returns an error.
return future::ready(Err(Error::Other(e))).boxed()
}
};
let mut state = Some(ImportState::Reading{block_iter});
let mut speedometer = Speedometer::<TBl>::new();
// Importing blocks is implemented as a future, because we want the operation to be
// interruptible.
//
// Every time we read a block from the input or import a bunch of blocks from the import
// queue, the `Future` re-schedules itself and returns `Poll::Pending`.
// This makes it possible either to interleave other operations in-between the block imports,
// or to stop the operation completely.
let import = future::poll_fn(move |cx| {
let client = &self.client;
let queue = &mut self.import_queue;
match state.take().expect("state should never be None; qed") {
ImportState::Reading{mut block_iter} => {
match block_iter.next() {
None => {
// The iterator is over: we now need to wait for the import queue to finish.
let num_expected_blocks = block_iter.num_expected_blocks();
let read_block_count = block_iter.read_block_count();
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
},
Some(block_result) => {
let read_block_count = block_iter.read_block_count();
match block_result {
Ok(block) => {
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
// The queue is full, so do not add this block and simply wait until
// the queue has made some progress.
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
} else {
// Queue is not full, we can keep on adding blocks to the queue.
import_block_to_queue(block, queue, force);
state = Some(ImportState::Reading{block_iter});
}
}
Err(e) => {
return Poll::Ready(
Err(Error::Other(format!("Error reading block #{}: {}", read_block_count, e))))
}
}
}
}
},
ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => {
let read_block_count = block_iter.read_block_count();
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
// Queue is still full, so wait until there is room to insert our block.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}
state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block});
} else {
// Queue is no longer full, so we can add our block to the queue.
import_block_to_queue(block, queue, force);
// Switch back to Reading state.
state = Some(ImportState::Reading{block_iter});
}
},
ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, mut delay} => {
// All the blocks have been added to the queue, which doesn't mean they
// have all been properly imported.
if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) {
// Importing is done, we can log the result and return.
info!(
"🎉 Imported {} blocks. Best: #{}",
read_block_count, client.chain_info().best_number
);
return Poll::Ready(Ok(()))
} else {
// Importing is not done, we still have to wait for the queue to finish.
// Wait for the delay, because we know the queue is lagging behind.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}
state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay});
}
}
}
queue.poll_actions(cx, &mut link);
let best_number = client.chain_info().best_number;
speedometer.notify_user(best_number);
if link.has_error {
return Poll::Ready(Err(
Error::Other(
format!("Stopping after #{} blocks because of an error", link.imported_blocks)
)
))
}
cx.waker().wake_by_ref();
Poll::Pending
});
Box::pin(import)
}
fn export_blocks(
self,
mut output: impl Write + 'static,
from: NumberFor<TBl>,
to: Option<NumberFor<TBl>>,
binary: bool
) -> Pin<Box<dyn Future<Output = Result<(), Error>>>> {
let mut block = from;
let last = match to {
Some(v) if v.is_zero() => One::one(),
Some(v) => v,
None => self.client.chain_info().best_number,
};
let mut wrote_header = false;
// Exporting blocks is implemented as a future, because we want the operation to be
// interruptible.
//
// Every time we write a block to the output, the `Future` re-schedules itself and returns
// `Poll::Pending`.
// This makes it possible either to interleave other operations in-between the block exports,
// or to stop the operation completely.
let export = future::poll_fn(move |cx| {
let client = &self.client;
if last < block {
return Poll::Ready(Err("Invalid block range specified".into()));
}
if !wrote_header {
info!("Exporting blocks from #{} to #{}", block, last);
if binary {
let last_: u64 = last.saturated_into::<u64>();
let block_: u64 = block.saturated_into::<u64>();
let len: u64 = last_ - block_ + 1;
output.write_all(&len.encode())?;
}
wrote_header = true;
}
match client.block(&BlockId::number(block))? {
Some(block) => {
if binary {
output.write_all(&block.encode())?;
} else {
serde_json::to_writer(&mut output, &block)
.map_err(|e| format!("Error writing JSON: {}", e))?;
}
},
// Reached end of the chain.
None => return Poll::Ready(Ok(())),
}
if (block % 10000.into()).is_zero() {
info!("#{}", block);
}
if block == last {
return Poll::Ready(Ok(()));
}
block += One::one();
// Re-schedule the task in order to continue the operation.
cx.waker().wake_by_ref();
Poll::Pending
});
Box::pin(export)
}
fn revert_chain(
&self,
blocks: NumberFor<TBl>
) -> Result<(), Error> {
let reverted = self.client.revert(blocks)?;
let info = self.client.chain_info();
if reverted.is_zero() {
info!("There aren't any non-finalized blocks to revert.");
} else {
info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash);
}
Ok(())
}
fn check_block(
self,
block_id: BlockId<TBl>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> {
match self.client.block(&block_id) {
Ok(Some(block)) => {
let mut buf = Vec::new();
1u64.encode_to(&mut buf);
block.encode_to(&mut buf);
let reader = std::io::Cursor::new(buf);
self.import_blocks(reader, true, true)
}
Ok(None) => Box::pin(future::err("Unknown block".into())),
Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())),
}
}
fn export_raw_state(
&self,
block: Option<BlockId<Self::Block>>,
) -> Result<Storage, Error> {
let block = block.unwrap_or_else(
|| BlockId::Hash(self.client.usage_info().chain.best_hash)
);
let empty_key = StorageKey(Vec::new());
let mut top_storage = self.client.storage_pairs(&block, &empty_key)?;
let mut children_default = HashMap::new();
// Remove all default child storage roots from the top storage and collect the child storage
// pairs.
while let Some(pos) = top_storage
.iter()
.position(|(k, _)| k.0.starts_with(well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX)) {
let (key, _) = top_storage.swap_remove(pos);
let key = StorageKey(
key.0[well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.len()..].to_vec(),
);
let child_info = ChildInfo::new_default(&key.0);
let keys = self.client.child_storage_keys(&block, &child_info, &empty_key)?;
let mut pairs = StorageMap::new();
keys.into_iter().try_for_each(|k| {
if let Some(value) = self.client.child_storage(&block, &child_info, &k)? {
pairs.insert(k.0, value.0);
}
Ok::<_, Error>(())
})?;
children_default.insert(key.0, StorageChild { child_info, data: pairs });
}
let top = top_storage.into_iter().map(|(k, v)| (k.0, v.0)).collect();
Ok(Storage { top, children_default })
}
}
@@ -0,0 +1,51 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::error::Error;
use futures::{future, prelude::*};
use sp_runtime::traits::Block as BlockT;
use sp_runtime::generic::BlockId;
use codec::Encode;
use sp_consensus::import_queue::ImportQueue;
use sc_client_api::{BlockBackend, UsageProvider};
use std::pin::Pin;
use std::sync::Arc;
use crate::chain_ops::import_blocks;
/// Re-validate known block.
pub fn check_block<B, IQ, C>(
client: Arc<C>,
import_queue: IQ,
block_id: BlockId<B>
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
where
C: BlockBackend<B> + UsageProvider<B> + Send + Sync + 'static,
B: BlockT + for<'de> serde::Deserialize<'de>,
IQ: ImportQueue<B> + 'static,
{
match client.block(&block_id) {
Ok(Some(block)) => {
let mut buf = Vec::new();
1u64.encode_to(&mut buf);
block.encode_to(&mut buf);
let reader = std::io::Cursor::new(buf);
import_blocks(client, import_queue, reader, true, true)
}
Ok(None) => Box::pin(future::err("Unknown block".into())),
Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())),
}
}
@@ -0,0 +1,104 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::error::Error;
use log::info;
use futures::{future, prelude::*};
use sp_runtime::traits::{
Block as BlockT, NumberFor, One, Zero, SaturatedConversion
};
use sp_runtime::generic::BlockId;
use codec::Encode;
use std::{io::Write, pin::Pin};
use sc_client_api::{BlockBackend, UsageProvider};
use std::sync::Arc;
use std::task::Poll;
/// Performs the blocks export.
pub fn export_blocks<B, C>(
client: Arc<C>,
mut output: impl Write + 'static,
from: NumberFor<B>,
to: Option<NumberFor<B>>,
binary: bool
) -> Pin<Box<dyn Future<Output = Result<(), Error>>>>
where
C: BlockBackend<B> + UsageProvider<B> + 'static,
B: BlockT,
{
let mut block = from;
let last = match to {
Some(v) if v.is_zero() => One::one(),
Some(v) => v,
None => client.usage_info().chain.best_number,
};
let mut wrote_header = false;
// Exporting blocks is implemented as a future, because we want the operation to be
// interruptible.
//
// Every time we write a block to the output, the `Future` re-schedules itself and returns
// `Poll::Pending`.
// This makes it possible either to interleave other operations in-between the block exports,
// or to stop the operation completely.
let export = future::poll_fn(move |cx| {
let client = &client;
if last < block {
return Poll::Ready(Err("Invalid block range specified".into()));
}
if !wrote_header {
info!("Exporting blocks from #{} to #{}", block, last);
if binary {
let last_: u64 = last.saturated_into::<u64>();
let block_: u64 = block.saturated_into::<u64>();
let len: u64 = last_ - block_ + 1;
output.write_all(&len.encode())?;
}
wrote_header = true;
}
match client.block(&BlockId::number(block))? {
Some(block) => {
if binary {
output.write_all(&block.encode())?;
} else {
serde_json::to_writer(&mut output, &block)
.map_err(|e| format!("Error writing JSON: {}", e))?;
}
},
// Reached end of the chain.
None => return Poll::Ready(Ok(())),
}
if (block % 10000.into()).is_zero() {
info!("#{}", block);
}
if block == last {
return Poll::Ready(Ok(()));
}
block += One::one();
// Re-schedule the task in order to continue the operation.
cx.waker().wake_by_ref();
Poll::Pending
});
Box::pin(export)
}
@@ -0,0 +1,71 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::error::Error;
use sp_runtime::traits::Block as BlockT;
use sp_runtime::generic::BlockId;
use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageChild, StorageMap};
use sc_client_api::{StorageProvider, UsageProvider};
use std::{collections::HashMap, sync::Arc};
/// Export the raw state at the given `block`. If `block` is `None`, the
/// best block will be used.
pub fn export_raw_state<B, BA, C>(
client: Arc<C>,
block: Option<BlockId<B>>,
) -> Result<Storage, Error>
where
C: UsageProvider<B> + StorageProvider<B, BA>,
B: BlockT,
BA: sc_client_api::backend::Backend<B>,
{
let block = block.unwrap_or_else(
|| BlockId::Hash(client.usage_info().chain.best_hash)
);
let empty_key = StorageKey(Vec::new());
let mut top_storage = client.storage_pairs(&block, &empty_key)?;
let mut children_default = HashMap::new();
// Remove all default child storage roots from the top storage and collect the child storage
// pairs.
while let Some(pos) = top_storage
.iter()
.position(|(k, _)| k.0.starts_with(well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX)) {
let (key, _) = top_storage.swap_remove(pos);
let key = StorageKey(
key.0[well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.len()..].to_vec(),
);
let child_info = ChildInfo::new_default(&key.0);
let keys = client.child_storage_keys(&block, &child_info, &empty_key)?;
let mut pairs = StorageMap::new();
keys.into_iter().try_for_each(|k| {
if let Some(value) = client.child_storage(&block, &child_info, &k)? {
pairs.insert(k.0, value.0);
}
Ok::<_, Error>(())
})?;
children_default.insert(key.0, StorageChild { child_info, data: pairs });
}
let top = top_storage.into_iter().map(|(k, v)| (k.0, v.0)).collect();
Ok(Storage { top, children_default })
}
@@ -0,0 +1,472 @@
// This file is part of Substrate.
// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::error;
use crate::error::Error;
use sc_chain_spec::ChainSpec;
use log::{warn, info};
use futures::{future, prelude::*};
use sp_runtime::traits::{
Block as BlockT, NumberFor, Zero, Header, MaybeSerializeDeserialize,
};
use sp_runtime::generic::SignedBlock;
use codec::{Decode, IoReader as CodecIoReader};
use sp_consensus::{
BlockOrigin,
import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue},
};
use std::{io::{Read, Seek}, pin::Pin};
use std::time::{Duration, Instant};
use futures_timer::Delay;
use std::task::Poll;
use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer};
use std::convert::{TryFrom, TryInto};
use sp_runtime::traits::{CheckedDiv, Saturating};
use sc_client_api::UsageProvider;
/// Number of blocks we will add to the queue before waiting for the queue to catch up.
const MAX_PENDING_BLOCKS: u64 = 1_024;
/// Number of milliseconds to wait until next poll.
const DELAY_TIME: u64 = 2_000;
/// Number of milliseconds that must have passed between two updates.
const TIME_BETWEEN_UPDATES: u64 = 3_000;
use std::sync::Arc;
/// Build a chain spec json
pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result<String> {
spec.as_json(raw).map_err(Into::into)
}
/// Helper enum that wraps either a binary decoder (from parity-scale-codec), or a JSON decoder
/// (from serde_json). Implements the Iterator Trait, calling `next()` will decode the next
/// SignedBlock and return it.
enum BlockIter<R, B> where
R: std::io::Read + std::io::Seek,
{
Binary {
// Total number of blocks we are expecting to decode.
num_expected_blocks: u64,
// Number of blocks we have decoded thus far.
read_block_count: u64,
// Reader to the data, used for decoding new blocks.
reader: CodecIoReader<R>,
},
Json {
// Nubmer of blocks we have decoded thus far.
read_block_count: u64,
// Stream to the data, used for decoding new blocks.
reader: StreamDeserializer<'static, JsonIoRead<R>, SignedBlock<B>>,
},
}
impl<R, B> BlockIter<R, B> where
R: Read + Seek + 'static,
B: BlockT + MaybeSerializeDeserialize,
{
fn new(input: R, binary: bool) -> Result<Self, String> {
if binary {
let mut reader = CodecIoReader(input);
// If the file is encoded in binary format, it is expected to first specify the number
// of blocks that are going to be decoded. We read it and add it to our enum struct.
let num_expected_blocks: u64 = Decode::decode(&mut reader)
.map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?;
Ok(BlockIter::Binary {
num_expected_blocks,
read_block_count: 0,
reader,
})
} else {
let stream_deser = Deserializer::from_reader(input)
.into_iter::<SignedBlock<B>>();
Ok(BlockIter::Json {
reader: stream_deser,
read_block_count: 0,
})
}
}
/// Returns the number of blocks read thus far.
fn read_block_count(&self) -> u64 {
match self {
BlockIter::Binary { read_block_count, .. }
| BlockIter::Json { read_block_count, .. }
=> *read_block_count,
}
}
/// Returns the total number of blocks to be imported, if possible.
fn num_expected_blocks(&self) -> Option<u64> {
match self {
BlockIter::Binary { num_expected_blocks, ..} => Some(*num_expected_blocks),
BlockIter::Json {..} => None
}
}
}
impl<R, B> Iterator for BlockIter<R, B> where
R: Read + Seek + 'static,
B: BlockT + MaybeSerializeDeserialize,
{
type Item = Result<SignedBlock<B>, String>;
fn next(&mut self) -> Option<Self::Item> {
match self {
BlockIter::Binary { num_expected_blocks, read_block_count, reader } => {
if read_block_count < num_expected_blocks {
let block_result: Result<SignedBlock::<B>, _> = SignedBlock::<B>::decode(reader)
.map_err(|e| e.to_string());
*read_block_count += 1;
Some(block_result)
} else {
// `read_block_count` == `num_expected_blocks` so we've read enough blocks.
None
}
}
BlockIter::Json { reader, read_block_count } => {
let res = Some(reader.next()?.map_err(|e| e.to_string()));
*read_block_count += 1;
res
}
}
}
}
/// Imports the SignedBlock to the queue.
fn import_block_to_queue<TBl, TImpQu>(
signed_block: SignedBlock<TBl>,
queue: &mut TImpQu,
force: bool
) where
TBl: BlockT + MaybeSerializeDeserialize,
TImpQu: 'static + ImportQueue<TBl>,
{
let (header, extrinsics) = signed_block.block.deconstruct();
let hash = header.hash();
// import queue handles verification and importing it into the client.
queue.import_blocks(BlockOrigin::File, vec![
IncomingBlock::<TBl> {
hash,
header: Some(header),
body: Some(extrinsics),
justification: signed_block.justification,
origin: None,
allow_missing_state: false,
import_existing: force,
}
]);
}
/// Returns true if we have imported every block we were supposed to import, else returns false.
fn importing_is_done(
num_expected_blocks: Option<u64>,
read_block_count: u64,
imported_blocks: u64
) -> bool {
if let Some(num_expected_blocks) = num_expected_blocks {
imported_blocks >= num_expected_blocks
} else {
imported_blocks >= read_block_count
}
}
/// Structure used to log the block importing speed.
struct Speedometer<B: BlockT> {
best_number: NumberFor<B>,
last_number: Option<NumberFor<B>>,
last_update: Instant,
}
impl<B: BlockT> Speedometer<B> {
/// Creates a fresh Speedometer.
fn new() -> Self {
Self {
best_number: NumberFor::<B>::from(0),
last_number: None,
last_update: Instant::now(),
}
}
/// Calculates `(best_number - last_number) / (now - last_update)` and
/// logs the speed of import.
fn display_speed(&self) {
// Number of milliseconds elapsed since last time.
let elapsed_ms = {
let elapsed = self.last_update.elapsed();
let since_last_millis = elapsed.as_secs() * 1000;
let since_last_subsec_millis = elapsed.subsec_millis() as u64;
since_last_millis + since_last_subsec_millis
};
// Number of blocks that have been imported since last time.
let diff = match self.last_number {
None => return,
Some(n) => self.best_number.saturating_sub(n)
};
if let Ok(diff) = TryInto::<u128>::try_into(diff) {
// If the number of blocks can be converted to a regular integer, then it's easy: just
// do the math and turn it into a `f64`.
let speed = diff.saturating_mul(10_000).checked_div(u128::from(elapsed_ms))
.map_or(0.0, |s| s as f64) / 10.0;
info!("📦 Current best block: {} ({:4.1} bps)", self.best_number, speed);
} else {
// If the number of blocks can't be converted to a regular integer, then we need a more
// algebraic approach and we stay within the realm of integers.
let one_thousand = NumberFor::<B>::from(1_000);
let elapsed = NumberFor::<B>::from(
<u32 as TryFrom<_>>::try_from(elapsed_ms).unwrap_or(u32::max_value())
);
let speed = diff.saturating_mul(one_thousand).checked_div(&elapsed)
.unwrap_or_else(Zero::zero);
info!("📦 Current best block: {} ({} bps)", self.best_number, speed)
}
}
/// Updates the Speedometer.
fn update(&mut self, best_number: NumberFor<B>) {
self.last_number = Some(self.best_number);
self.best_number = best_number;
self.last_update = Instant::now();
}
// If more than TIME_BETWEEN_UPDATES has elapsed since last update,
// then print and update the speedometer.
fn notify_user(&mut self, best_number: NumberFor<B>) {
let delta = Duration::from_millis(TIME_BETWEEN_UPDATES);
if Instant::now().duration_since(self.last_update) >= delta {
self.display_speed();
self.update(best_number);
}
}
}
/// Different State that the `import_blocks` future could be in.
enum ImportState<R, B> where
R: Read + Seek + 'static,
B: BlockT + MaybeSerializeDeserialize,
{
/// We are reading from the BlockIter structure, adding those blocks to the queue if possible.
Reading{block_iter: BlockIter<R, B>},
/// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to
/// catch up.
WaitingForImportQueueToCatchUp{
block_iter: BlockIter<R, B>,
delay: Delay,
block: SignedBlock<B>
},
// We have added all the blocks to the queue but they are still being processed.
WaitingForImportQueueToFinish{
num_expected_blocks: Option<u64>,
read_block_count: u64,
delay: Delay,
},
}
/// Starts the process of importing blocks.
pub fn import_blocks<B, IQ, C>(
client: Arc<C>,
mut import_queue: IQ,
input: impl Read + Seek + Send + 'static,
force: bool,
binary: bool,
) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
where
C: UsageProvider<B> + Send + Sync + 'static,
B: BlockT + for<'de> serde::Deserialize<'de>,
IQ: ImportQueue<B> + 'static,
{
struct WaitLink {
imported_blocks: u64,
has_error: bool,
}
impl WaitLink {
fn new() -> WaitLink {
WaitLink {
imported_blocks: 0,
has_error: false,
}
}
}
impl<B: BlockT> Link<B> for WaitLink {
fn blocks_processed(
&mut self,
imported: usize,
_num_expected_blocks: usize,
results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
) {
self.imported_blocks += imported as u64;
for result in results {
if let (Err(err), hash) = result {
warn!("There was an error importing block with hash {:?}: {:?}", hash, err);
self.has_error = true;
break;
}
}
}
}
let mut link = WaitLink::new();
let block_iter_res: Result<BlockIter<_, B>, String> = BlockIter::new(input, binary);
let block_iter = match block_iter_res {
Ok(block_iter) => block_iter,
Err(e) => {
// We've encountered an error while creating the block iterator
// so we can just return a future that returns an error.
return future::ready(Err(Error::Other(e))).boxed()
}
};
let mut state = Some(ImportState::Reading{block_iter});
let mut speedometer = Speedometer::<B>::new();
// Importing blocks is implemented as a future, because we want the operation to be
// interruptible.
//
// Every time we read a block from the input or import a bunch of blocks from the import
// queue, the `Future` re-schedules itself and returns `Poll::Pending`.
// This makes it possible either to interleave other operations in-between the block imports,
// or to stop the operation completely.
let import = future::poll_fn(move |cx| {
let client = &client;
let queue = &mut import_queue;
match state.take().expect("state should never be None; qed") {
ImportState::Reading{mut block_iter} => {
match block_iter.next() {
None => {
// The iterator is over: we now need to wait for the import queue to finish.
let num_expected_blocks = block_iter.num_expected_blocks();
let read_block_count = block_iter.read_block_count();
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToFinish {
num_expected_blocks, read_block_count, delay
});
},
Some(block_result) => {
let read_block_count = block_iter.read_block_count();
match block_result {
Ok(block) => {
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
// The queue is full, so do not add this block and simply wait
// until the queue has made some progress.
let delay = Delay::new(Duration::from_millis(DELAY_TIME));
state = Some(ImportState::WaitingForImportQueueToCatchUp {
block_iter, delay, block
});
} else {
// Queue is not full, we can keep on adding blocks to the queue.
import_block_to_queue(block, queue, force);
state = Some(ImportState::Reading{block_iter});
}
}
Err(e) => {
return Poll::Ready(
Err(Error::Other(
format!("Error reading block #{}: {}", read_block_count, e)
)))
}
}
}
}
},
ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => {
let read_block_count = block_iter.read_block_count();
if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS {
// Queue is still full, so wait until there is room to insert our block.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToCatchUp {
block_iter, delay, block
});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}
state = Some(ImportState::WaitingForImportQueueToCatchUp {
block_iter, delay, block
});
} else {
// Queue is no longer full, so we can add our block to the queue.
import_block_to_queue(block, queue, force);
// Switch back to Reading state.
state = Some(ImportState::Reading{block_iter});
}
},
ImportState::WaitingForImportQueueToFinish {
num_expected_blocks, read_block_count, mut delay
} => {
// All the blocks have been added to the queue, which doesn't mean they
// have all been properly imported.
if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) {
// Importing is done, we can log the result and return.
info!(
"🎉 Imported {} blocks. Best: #{}",
read_block_count, client.usage_info().chain.best_number
);
return Poll::Ready(Ok(()))
} else {
// Importing is not done, we still have to wait for the queue to finish.
// Wait for the delay, because we know the queue is lagging behind.
match Pin::new(&mut delay).poll(cx) {
Poll::Pending => {
state = Some(ImportState::WaitingForImportQueueToFinish {
num_expected_blocks, read_block_count, delay
});
return Poll::Pending
},
Poll::Ready(_) => {
delay.reset(Duration::from_millis(DELAY_TIME));
},
}
state = Some(ImportState::WaitingForImportQueueToFinish {
num_expected_blocks, read_block_count, delay
});
}
}
}
queue.poll_actions(cx, &mut link);
let best_number = client.usage_info().chain.best_number;
speedometer.notify_user(best_number);
if link.has_error {
return Poll::Ready(Err(
Error::Other(
format!("Stopping after #{} blocks because of an error", link.imported_blocks)
)
))
}
cx.waker().wake_by_ref();
Poll::Pending
});
Box::pin(import)
}
@@ -0,0 +1,29 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Chain utilities.
mod check_block;
mod export_blocks;
mod export_raw_state;
mod import_blocks;
mod revert_chain;
pub use check_block::*;
pub use export_blocks::*;
pub use export_raw_state::*;
pub use import_blocks::*;
pub use revert_chain::*;
@@ -0,0 +1,43 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
use crate::error::Error;
use log::info;
use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
use sc_client_api::{Backend, UsageProvider};
use std::sync::Arc;
/// Performs a revert of `blocks` blocks.
pub fn revert_chain<B, BA, C>(
client: Arc<C>,
backend: Arc<BA>,
blocks: NumberFor<B>
) -> Result<(), Error>
where
B: BlockT,
C: UsageProvider<B>,
BA: Backend<B>,
{
let reverted = backend.revert(blocks, false)?;
let info = client.usage_info().chain;
if reverted.is_zero() {
info!("There aren't any non-finalized blocks to revert.");
} else {
info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash);
}
Ok(())
}
+2 -2
View File
@@ -23,7 +23,6 @@
#![recursion_limit="128"]
pub mod config;
#[macro_use]
pub mod chain_ops;
pub mod error;
@@ -55,7 +54,7 @@ use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver,
pub use self::error::Error;
pub use self::builder::{
new_full_client, new_client,
ServiceBuilder, ServiceBuilderCommand, TFullClient, TLightClient, TFullBackend, TLightBackend,
ServiceBuilder, TFullClient, TLightClient, TFullBackend, TLightBackend,
TFullCallExecutor, TLightCallExecutor, RpcExtensionBuilder,
};
pub use config::{
@@ -79,6 +78,7 @@ pub use sc_network::config::{
pub use sc_tracing::TracingReceiver;
pub use task_manager::SpawnTaskHandle;
pub use task_manager::TaskManager;
pub use sp_consensus::import_queue::ImportQueue;
use sc_client_api::{Backend, BlockchainEvents};
const DEFAULT_PROTOCOL_ID: &str = "sup";