Collator node workflow (#280)

* arbitrary application logic in CLI

* collation work

* split up exit and work futures in application

* collation node workflow

* typo

* indentation fix

* doc grumbles

* rename Application to Worker

* refactor Worker::exit to exit_only
This commit is contained in:
Robert Habermeier
2018-07-06 15:13:31 +01:00
committed by Sergey Pepyakin
parent be5ff4e62f
commit 9b254c3075
15 changed files with 294 additions and 97 deletions
+60 -35
View File
@@ -24,10 +24,9 @@ extern crate atty;
extern crate ansi_term;
extern crate regex;
extern crate time;
extern crate fdlimit;
extern crate futures;
extern crate tokio;
extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
extern crate triehash;
extern crate parking_lot;
@@ -66,6 +65,11 @@ mod informant;
mod chain_spec;
pub use chain_spec::ChainSpec;
pub use client::error::Error as ClientError;
pub use client::backend::Backend as ClientBackend;
pub use state_machine::Backend as StateMachineBackend;
pub use polkadot_primitives::Block as PolkadotBlock;
pub use service::{Components as ServiceComponents, Service};
use std::io::{self, Write, Read, stdin, stdout};
use std::fs::File;
@@ -117,6 +121,26 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
.unwrap_or_else(default_base_path)
}
/// Additional worker making use of the node, to run asynchronously before shutdown.
///
/// This will be invoked with the service and spawn a future that resolves
/// when complete.
pub trait Worker {
/// A future that resolves when the work is done or the node should exit.
/// This will be run on a tokio runtime.
type Work: Future<Item=(),Error=()>;
/// An exit scheduled for the future.
type Exit: Future<Item=(),Error=()> + Send + 'static;
/// Don't work, but schedule an exit.
fn exit_only(self) -> Self::Exit;
/// Do work and schedule exit.
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>;
}
/// Parse command line arguments and start the node.
///
/// IANA unassigned port ranges that we could use:
@@ -125,9 +149,10 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
/// 9556-9591 Unassigned
/// 9803-9874 Unassigned
/// 9926-9949 Unassigned
pub fn run<I, T>(args: I) -> error::Result<()> where
pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
W: Worker,
{
let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) {
@@ -154,11 +179,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
}
if let Some(matches) = matches.subcommand_matches("export-blocks") {
return export_blocks(matches);
return export_blocks(matches, worker.exit_only());
}
if let Some(matches) = matches.subcommand_matches("import-blocks") {
return import_blocks(matches);
return import_blocks(matches, worker.exit_only());
}
let spec = load_spec(&matches)?;
@@ -255,8 +280,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
};
match role == service::Role::LIGHT {
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf)?,
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
}
// TODO: hard exit if this stalls?
@@ -272,16 +297,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}
fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn export_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let base_path = base_path(matches);
let spec = load_spec(&matches)?;
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
info!("DB path: {}", config.database_path);
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
info!("Exporting blocks");
let mut block: u32 = match matches.value_of("from") {
@@ -310,7 +338,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
}
loop {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match client.block(&BlockId::number(block as u64))? {
@@ -334,15 +362,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}
fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn import_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let spec = load_spec(&matches)?;
let base_path = base_path(matches);
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
let mut file: Box<Read> = match matches.value_of("INPUT") {
@@ -354,7 +386,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?;
let mut block = 0;
for _ in 0 .. count {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match SignedBlock::decode(&mut file) {
@@ -377,27 +409,19 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}
fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
fn run_until_exit<C, W>(
runtime: &mut Runtime,
service: service::Service<C>,
matches: &clap::ArgMatches,
sys_conf: SystemConfiguration,
worker: W,
) -> error::Result<()>
where
C: service::Components,
W: Worker,
client::error::Error: From<<<<C as service::Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
{
let exit = {
let (exit_send, exit) = exit_future::signal();
let exit_send = ::std::cell::RefCell::new(Some(exit_send));
ctrlc::CtrlC::set_handler(move || {
let exit_send = exit_send
.try_borrow_mut()
.expect("only borrowed in non-reetrant signal handler; qed")
.take();
if let Some(signal) = exit_send {
signal.fire();
}
});
exit
};
let (exit_send, exit) = exit_future::signal();
let executor = runtime.executor();
informant::start(&service, exit.clone(), executor.clone());
@@ -422,7 +446,8 @@ fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matche
)
};
let _ = exit.wait();
let _ = worker.work(&service).wait();
exit_send.fire();
Ok(())
}