mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-05-30 15:11:02 +00:00
handle exit and avoid threads hanging (#137)
* barrier on starting network * handle exit better * give consensus service its own internal exit signal * update comment * remove stop_notifications and fix build
This commit is contained in:
committed by
Arkadiy Paronyan
parent
a3eec9362f
commit
7f2c798a06
@@ -68,6 +68,15 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
|
|||||||
T: Into<std::ffi::OsString> + Clone,
|
T: Into<std::ffi::OsString> + Clone,
|
||||||
{
|
{
|
||||||
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
|
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
|
||||||
|
let exit = {
|
||||||
|
// can't use signal directly here because CtrlC takes only `Fn`.
|
||||||
|
let (exit_send, exit) = mpsc::channel(1);
|
||||||
|
ctrlc::CtrlC::set_handler(move || {
|
||||||
|
exit_send.clone().send(()).wait().expect("Error sending exit notification");
|
||||||
|
});
|
||||||
|
|
||||||
|
exit
|
||||||
|
};
|
||||||
|
|
||||||
let yaml = load_yaml!("./cli.yml");
|
let yaml = load_yaml!("./cli.yml");
|
||||||
let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) {
|
let matches = match clap::App::from_yaml(yaml).version(crate_version!()).get_matches_from_safe(args) {
|
||||||
@@ -140,11 +149,6 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
|
|||||||
|
|
||||||
informant::start(&service, core.handle());
|
informant::start(&service, core.handle());
|
||||||
|
|
||||||
let (exit_send, exit) = mpsc::channel(1);
|
|
||||||
ctrlc::CtrlC::set_handler(move || {
|
|
||||||
exit_send.clone().send(()).wait().expect("Error sending exit notification");
|
|
||||||
});
|
|
||||||
|
|
||||||
let _rpc_servers = {
|
let _rpc_servers = {
|
||||||
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?;
|
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?;
|
||||||
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?;
|
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?;
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ tokio-core = "0.1.12"
|
|||||||
ed25519 = { path = "../../substrate/ed25519" }
|
ed25519 = { path = "../../substrate/ed25519" }
|
||||||
error-chain = "0.11"
|
error-chain = "0.11"
|
||||||
log = "0.3"
|
log = "0.3"
|
||||||
|
exit-future = "0.1"
|
||||||
polkadot-api = { path = "../api" }
|
polkadot-api = { path = "../api" }
|
||||||
polkadot-collator = { path = "../collator" }
|
polkadot-collator = { path = "../collator" }
|
||||||
polkadot-primitives = { path = "../primitives" }
|
polkadot-primitives = { path = "../primitives" }
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ extern crate substrate_primitives as primitives;
|
|||||||
extern crate substrate_runtime_support as runtime_support;
|
extern crate substrate_runtime_support as runtime_support;
|
||||||
extern crate substrate_network;
|
extern crate substrate_network;
|
||||||
|
|
||||||
|
extern crate exit_future;
|
||||||
extern crate tokio_core;
|
extern crate tokio_core;
|
||||||
extern crate substrate_keyring;
|
extern crate substrate_keyring;
|
||||||
extern crate substrate_client as client;
|
extern crate substrate_client as client;
|
||||||
|
|||||||
@@ -218,11 +218,6 @@ impl<E> Sink for BftSink<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Consensus service. Starts working when created.
|
|
||||||
pub struct Service {
|
|
||||||
thread: Option<thread::JoinHandle<()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Network(Arc<net::ConsensusService>);
|
struct Network(Arc<net::ConsensusService>);
|
||||||
|
|
||||||
fn start_bft<F, C>(
|
fn start_bft<F, C>(
|
||||||
@@ -259,16 +254,24 @@ fn start_bft<F, C>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Consensus service. Starts working when created.
|
||||||
|
pub struct Service {
|
||||||
|
thread: Option<thread::JoinHandle<()>>,
|
||||||
|
exit_signal: Option<::exit_future::Signal>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
/// Create and start a new instance.
|
/// Create and start a new instance.
|
||||||
pub fn new<C>(
|
pub fn new<C>(
|
||||||
client: Arc<C>,
|
client: Arc<C>,
|
||||||
network: Arc<net::ConsensusService>,
|
network: Arc<net::ConsensusService>,
|
||||||
transaction_pool: Arc<Mutex<TransactionPool>>,
|
transaction_pool: Arc<Mutex<TransactionPool>>,
|
||||||
key: ed25519::Pair
|
key: ed25519::Pair,
|
||||||
) -> Service
|
) -> Service
|
||||||
where C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
|
where
|
||||||
|
C: BlockchainEvents + ChainHead + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
|
let (signal, exit) = ::exit_future::signal();
|
||||||
let thread = thread::spawn(move || {
|
let thread = thread::spawn(move || {
|
||||||
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
|
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
|
||||||
let key = Arc::new(key);
|
let key = Arc::new(key);
|
||||||
@@ -281,15 +284,26 @@ impl Service {
|
|||||||
let messages = SharedMessageCollection::new();
|
let messages = SharedMessageCollection::new();
|
||||||
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
|
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));
|
||||||
|
|
||||||
let handle = core.handle();
|
let notifications = {
|
||||||
let notifications = client.import_notification_stream().for_each(|notification| {
|
let handle = core.handle();
|
||||||
if notification.is_new_best {
|
let network = network.clone();
|
||||||
start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
|
let client = client.clone();
|
||||||
}
|
let bft_service = bft_service.clone();
|
||||||
Ok(())
|
let messages = messages.clone();
|
||||||
});
|
|
||||||
|
|
||||||
let interval = reactor::Interval::new_at(Instant::now() + Duration::from_millis(TIMER_DELAY_MS), Duration::from_millis(TIMER_INTERVAL_MS), &handle).unwrap();
|
client.import_notification_stream().for_each(move |notification| {
|
||||||
|
if notification.is_new_best {
|
||||||
|
start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let interval = reactor::Interval::new_at(
|
||||||
|
Instant::now() + Duration::from_millis(TIMER_DELAY_MS),
|
||||||
|
Duration::from_millis(TIMER_INTERVAL_MS),
|
||||||
|
&core.handle(),
|
||||||
|
).expect("it is always possible to create an interval with valid params");
|
||||||
let mut prev_best = match client.best_block_header() {
|
let mut prev_best = match client.best_block_header() {
|
||||||
Ok(header) => header.blake2_256(),
|
Ok(header) => header.blake2_256(),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -297,36 +311,47 @@ impl Service {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let c = client.clone();
|
|
||||||
let s = bft_service.clone();
|
let timed = {
|
||||||
let n = network.clone();
|
let c = client.clone();
|
||||||
let m = messages.clone();
|
let s = bft_service.clone();
|
||||||
let handle = core.handle();
|
let n = network.clone();
|
||||||
let timed = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
|
let m = messages.clone();
|
||||||
if let Ok(best_block) = c.best_block_header() {
|
let handle = core.handle();
|
||||||
let hash = best_block.blake2_256();
|
|
||||||
m.collect_garbage();
|
interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
|
||||||
if hash == prev_best {
|
if let Ok(best_block) = c.best_block_header() {
|
||||||
debug!("Starting consensus round after a timeout");
|
let hash = best_block.blake2_256();
|
||||||
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
|
m.collect_garbage();
|
||||||
|
if hash == prev_best {
|
||||||
|
debug!("Starting consensus round after a timeout");
|
||||||
|
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
|
||||||
|
}
|
||||||
|
prev_best = hash;
|
||||||
}
|
}
|
||||||
prev_best = hash;
|
Ok(())
|
||||||
}
|
})
|
||||||
Ok(())
|
};
|
||||||
});
|
|
||||||
|
core.handle().spawn(notifications);
|
||||||
core.handle().spawn(timed);
|
core.handle().spawn(timed);
|
||||||
if let Err(e) = core.run(notifications) {
|
if let Err(e) = core.run(exit) {
|
||||||
debug!("BFT event loop error {:?}", e);
|
debug!("BFT event loop error {:?}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Service {
|
Service {
|
||||||
thread: Some(thread)
|
thread: Some(thread),
|
||||||
|
exit_signal: Some(signal),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for Service {
|
impl Drop for Service {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
if let Some(signal) = self.exit_signal.take() {
|
||||||
|
signal.fire();
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(thread) = self.thread.take() {
|
if let Some(thread) = self.thread.take() {
|
||||||
thread.join().expect("The service thread has panicked");
|
thread.join().expect("The service thread has panicked");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ tokio-timer = "0.1.2"
|
|||||||
error-chain = "0.11"
|
error-chain = "0.11"
|
||||||
log = "0.3"
|
log = "0.3"
|
||||||
tokio-core = "0.1.12"
|
tokio-core = "0.1.12"
|
||||||
|
exit-future = "0.1"
|
||||||
ed25519 = { path = "../../substrate/ed25519" }
|
ed25519 = { path = "../../substrate/ed25519" }
|
||||||
polkadot-primitives = { path = "../primitives" }
|
polkadot-primitives = { path = "../primitives" }
|
||||||
polkadot-runtime = { path = "../runtime" }
|
polkadot-runtime = { path = "../runtime" }
|
||||||
|
|||||||
+46
-19
@@ -28,14 +28,15 @@ extern crate polkadot_api;
|
|||||||
extern crate polkadot_consensus as consensus;
|
extern crate polkadot_consensus as consensus;
|
||||||
extern crate polkadot_transaction_pool as transaction_pool;
|
extern crate polkadot_transaction_pool as transaction_pool;
|
||||||
extern crate polkadot_keystore as keystore;
|
extern crate polkadot_keystore as keystore;
|
||||||
|
extern crate substrate_client as client;
|
||||||
extern crate substrate_runtime_io as runtime_io;
|
extern crate substrate_runtime_io as runtime_io;
|
||||||
extern crate substrate_primitives as primitives;
|
extern crate substrate_primitives as primitives;
|
||||||
extern crate substrate_network as network;
|
extern crate substrate_network as network;
|
||||||
extern crate substrate_codec as codec;
|
extern crate substrate_codec as codec;
|
||||||
extern crate substrate_executor;
|
extern crate substrate_executor;
|
||||||
|
|
||||||
|
extern crate exit_future;
|
||||||
extern crate tokio_core;
|
extern crate tokio_core;
|
||||||
extern crate substrate_client as client;
|
|
||||||
|
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate error_chain;
|
extern crate error_chain;
|
||||||
@@ -65,6 +66,7 @@ use polkadot_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyC
|
|||||||
use client::{genesis, BlockchainEvents};
|
use client::{genesis, BlockchainEvents};
|
||||||
use client::in_mem::Backend as InMemory;
|
use client::in_mem::Backend as InMemory;
|
||||||
use network::ManageNetwork;
|
use network::ManageNetwork;
|
||||||
|
use exit_future::Signal;
|
||||||
|
|
||||||
pub use self::error::{ErrorKind, Error};
|
pub use self::error::{ErrorKind, Error};
|
||||||
pub use config::{Configuration, Role, ChainSpec};
|
pub use config::{Configuration, Role, ChainSpec};
|
||||||
@@ -77,6 +79,7 @@ pub struct Service {
|
|||||||
client: Arc<Client>,
|
client: Arc<Client>,
|
||||||
network: Arc<network::Service>,
|
network: Arc<network::Service>,
|
||||||
transaction_pool: Arc<Mutex<TransactionPool>>,
|
transaction_pool: Arc<Mutex<TransactionPool>>,
|
||||||
|
signal: Option<Signal>,
|
||||||
_consensus: Option<consensus::Service>,
|
_consensus: Option<consensus::Service>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -242,6 +245,10 @@ fn local_testnet_config() -> ChainConfig {
|
|||||||
impl Service {
|
impl Service {
|
||||||
/// Creates and register protocol with the network service
|
/// Creates and register protocol with the network service
|
||||||
pub fn new(mut config: Configuration) -> Result<Service, error::Error> {
|
pub fn new(mut config: Configuration) -> Result<Service, error::Error> {
|
||||||
|
use std::sync::Barrier;
|
||||||
|
|
||||||
|
let (signal, exit) = ::exit_future::signal();
|
||||||
|
|
||||||
// Create client
|
// Create client
|
||||||
let executor = polkadot_executor::Executor::new();
|
let executor = polkadot_executor::Executor::new();
|
||||||
let mut storage = Default::default();
|
let mut storage = Default::default();
|
||||||
@@ -284,7 +291,39 @@ impl Service {
|
|||||||
chain: client.clone(),
|
chain: client.clone(),
|
||||||
transaction_pool: transaction_pool_adapter,
|
transaction_pool: transaction_pool_adapter,
|
||||||
};
|
};
|
||||||
|
|
||||||
let network = network::Service::new(network_params)?;
|
let network = network::Service::new(network_params)?;
|
||||||
|
let barrier = ::std::sync::Arc::new(Barrier::new(2));
|
||||||
|
|
||||||
|
let thread = {
|
||||||
|
let client = client.clone();
|
||||||
|
let network = network.clone();
|
||||||
|
let txpool = transaction_pool.clone();
|
||||||
|
|
||||||
|
let thread_barrier = barrier.clone();
|
||||||
|
thread::spawn(move || {
|
||||||
|
network.start_network();
|
||||||
|
|
||||||
|
thread_barrier.wait();
|
||||||
|
let mut core = Core::new().expect("tokio::Core could not be created");
|
||||||
|
let events = client.import_notification_stream().for_each(move |notification| {
|
||||||
|
network.on_block_imported(notification.hash, ¬ification.header);
|
||||||
|
prune_imported(&*client, &*txpool, notification.hash);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
core.handle().spawn(events);
|
||||||
|
if let Err(e) = core.run(exit) {
|
||||||
|
debug!("Polkadot service event loop shutdown with {:?}", e);
|
||||||
|
}
|
||||||
|
debug!("Polkadot service shutdown");
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
// wait for the network to start up before starting the consensus
|
||||||
|
// service.
|
||||||
|
barrier.wait();
|
||||||
|
|
||||||
// Spin consensus service if configured
|
// Spin consensus service if configured
|
||||||
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR {
|
let consensus_service = if config.roles & Role::VALIDATOR == Role::VALIDATOR {
|
||||||
@@ -296,28 +335,12 @@ impl Service {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let thread_client = client.clone();
|
|
||||||
let thread_network = network.clone();
|
|
||||||
let thread_txpool = transaction_pool.clone();
|
|
||||||
let thread = thread::spawn(move || {
|
|
||||||
thread_network.start_network();
|
|
||||||
let mut core = Core::new().expect("tokio::Core could not be created");
|
|
||||||
let events = thread_client.import_notification_stream().for_each(|notification| {
|
|
||||||
thread_network.on_block_imported(notification.hash, ¬ification.header);
|
|
||||||
prune_imported(&*thread_client, &*thread_txpool, notification.hash);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
if let Err(e) = core.run(events) {
|
|
||||||
debug!("Polkadot service event loop shutdown with {:?}", e);
|
|
||||||
}
|
|
||||||
debug!("Polkadot service shutdown");
|
|
||||||
});
|
|
||||||
Ok(Service {
|
Ok(Service {
|
||||||
thread: Some(thread),
|
thread: Some(thread),
|
||||||
client: client,
|
client: client,
|
||||||
network: network,
|
network: network,
|
||||||
transaction_pool: transaction_pool,
|
transaction_pool: transaction_pool,
|
||||||
|
signal: Some(signal),
|
||||||
_consensus: consensus_service,
|
_consensus: consensus_service,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -357,8 +380,12 @@ pub fn prune_imported(client: &Client, pool: &Mutex<TransactionPool>, hash: Head
|
|||||||
|
|
||||||
impl Drop for Service {
|
impl Drop for Service {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
self.client.stop_notifications();
|
|
||||||
self.network.stop_network();
|
self.network.stop_network();
|
||||||
|
|
||||||
|
if let Some(signal) = self.signal.take() {
|
||||||
|
signal.fire();
|
||||||
|
}
|
||||||
|
|
||||||
if let Some(thread) = self.thread.take() {
|
if let Some(thread) = self.thread.take() {
|
||||||
thread.join().expect("The service thread has panicked");
|
thread.join().expect("The service thread has panicked");
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user