node: exit on GRANDPA voter or BABE authoring error (#3353)

* node: exit on GRANDPA voter or BABE authoring error

* node: exit process with non-zero return code when service fails

* service: rename infallible task to essential task

* service: revert field name changes

* core: fix service testnet
This commit is contained in:
André Silva
2019-08-12 14:54:30 +01:00
committed by Robert Habermeier
parent 09b57261df
commit 70d716dc48
10 changed files with 98 additions and 41 deletions
+12 -1
View File
@@ -36,7 +36,17 @@ pub enum Error {
Input(String),
/// Invalid listen multiaddress
#[display(fmt="Invalid listen multiaddress")]
InvalidListenMultiaddress
InvalidListenMultiaddress,
/// Other uncategorized error.
Other(String),
}
/// Must be implemented explicitly because `derive_more` won't generate this
/// case due to conflicting derive for `Other(String)`.
impl std::convert::From<String> for Error {
fn from(s: String) -> Error {
Error::Input(s)
}
}
impl std::error::Error for Error {
@@ -48,6 +58,7 @@ impl std::error::Error for Error {
Error::Client(ref err) => Some(err),
Error::Input(_) => None,
Error::InvalidListenMultiaddress => None,
Error::Other(_) => None,
}
}
}
+4 -4
View File
@@ -53,7 +53,7 @@
//! included in the newly-finalized chain.
use futures::prelude::*;
use log::{debug, info, warn};
use log::{debug, error, info};
use futures::sync::mpsc;
use client::{
BlockchainEvents, CallExecutor, Client, backend::Backend, error::Error as ClientError,
@@ -680,8 +680,8 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
poll_voter.select2(voter_commands_rx).then(move |res| match res {
Ok(future::Either::A(((), _))) => {
// voters don't conclude naturally; this could reasonably be an error.
Ok(FutureLoop::Break(()))
// voters don't conclude naturally
Err(Error::Safety("GRANDPA voter has concluded.".into()))
},
Err(future::Either::B(_)) => {
// the `voter_commands_rx` stream should not fail.
@@ -709,7 +709,7 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
let voter_work = voter_work
.map(|_| ())
.map_err(|e| {
warn!("GRANDPA Voter failed: {:?}", e);
error!("GRANDPA Voter failed: {:?}", e);
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
+2 -2
View File
@@ -502,7 +502,7 @@ impl<Factory: ServiceFactory> DerefMut for FullComponents<Factory> {
impl<Factory: ServiceFactory> Future for FullComponents<Factory> {
type Item = ();
type Error = ();
type Error = super::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.service.poll()
@@ -627,7 +627,7 @@ impl<Factory: ServiceFactory> DerefMut for LightComponents<Factory> {
impl<Factory: ServiceFactory> Future for LightComponents<Factory> {
type Item = ();
type Error = ();
type Error = super::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.service.poll()
+28 -3
View File
@@ -28,6 +28,7 @@ pub mod error;
use std::io;
use std::net::SocketAddr;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use futures::sync::mpsc;
use parking_lot::Mutex;
@@ -82,8 +83,14 @@ pub struct Service<Components: components::Components> {
NetworkStatus<ComponentBlock<Components>>, NetworkState
)>>>>,
transaction_pool: Arc<TransactionPool<Components::TransactionPoolApi>>,
/// A future that resolves when the service has exited, this is useful to
/// make sure any internally spawned futures stop when the service does.
exit: exit_future::Exit,
/// A signal that makes the exit future above resolve, fired on service drop.
signal: Option<Signal>,
/// Set to `true` when a spawned essential task has failed. The next time
/// the service future is polled it should complete with an error.
essential_failed: Arc<AtomicBool>,
/// Sender for futures that must be spawned as background tasks.
to_spawn_tx: mpsc::UnboundedSender<Box<dyn Future<Item = (), Error = ()> + Send>>,
/// Receiver for futures that must be spawned as background tasks.
@@ -395,7 +402,7 @@ impl<Components: components::Components> Service<Components> {
// Telemetry
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
let is_authority = config.roles == Roles::AUTHORITY;
let is_authority = config.roles.is_authority();
let network_id = network.local_peer_id().to_base58();
let name = config.name.clone();
let impl_name = config.impl_name.to_owned();
@@ -440,12 +447,13 @@ impl<Components: components::Components> Service<Components> {
network_status_sinks,
select_chain,
transaction_pool,
exit,
signal: Some(signal),
essential_failed: Arc::new(AtomicBool::new(false)),
to_spawn_tx,
to_spawn_rx,
to_poll: Vec::new(),
config,
exit,
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
@@ -491,6 +499,19 @@ impl<Components: components::Components> Service<Components> {
let _ = self.to_spawn_tx.unbounded_send(Box::new(task));
}
/// Spawns a task in the background that runs the future passed as
/// parameter. The given task is considered essential, i.e. if it errors we
/// trigger a service exit.
pub fn spawn_essential_task(&self, task: impl Future<Item = (), Error = ()> + Send + 'static) {
let essential_failed = self.essential_failed.clone();
let essential_task = Box::new(task.map_err(move |_| {
error!("Essential task failed. Shutting down service.");
essential_failed.store(true, Ordering::Relaxed);
}));
let _ = self.to_spawn_tx.unbounded_send(essential_task);
}
/// Returns a handle for spawning tasks.
pub fn spawn_task_handle(&self) -> SpawnTaskHandle {
SpawnTaskHandle {
@@ -548,9 +569,13 @@ impl<Components: components::Components> Service<Components> {
impl<Components> Future for Service<Components> where Components: components::Components {
type Item = ();
type Error = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.essential_failed.load(Ordering::Relaxed) {
return Err(Error::Other("Essential task failed.".into()));
}
while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() {
let executor = tokio_executor::DefaultExecutor::current();
if let Err(err) = executor.execute(task_to_spawn) {
+13 -13
View File
@@ -73,9 +73,9 @@ impl<T> From<T> for SyncService<T> {
}
}
impl<T: Future<Item=(), Error=()>> Future for SyncService<T> {
impl<T: Future<Item=(), Error=service::Error>> Future for SyncService<T> {
type Item = ();
type Error = ();
type Error = service::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.lock().unwrap().poll()
@@ -195,8 +195,8 @@ fn node_config<F: ServiceFactory> (
}
impl<F: ServiceFactory> TestNet<F> where
F::FullService: Future<Item=(), Error=()>,
F::LightService: Future<Item=(), Error=()>
F::FullService: Future<Item=(), Error=service::Error>,
F::LightService: Future<Item=(), Error=service::Error>,
{
fn new(
temp: &TempDir,
@@ -239,7 +239,7 @@ impl<F: ServiceFactory> TestNet<F> where
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service"));
executor.spawn(service.clone());
executor.spawn(service.clone().map_err(|_| ()));
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into()));
((index + nodes), service, addr)
}));
@@ -250,7 +250,7 @@ impl<F: ServiceFactory> TestNet<F> where
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service"));
executor.spawn(service.clone());
executor.spawn(service.clone().map_err(|_| ()));
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into()));
(index, service, addr)
}));
@@ -261,7 +261,7 @@ impl<F: ServiceFactory> TestNet<F> where
let addr = node_config.network.listen_addresses.iter().next().unwrap().clone();
let service = SyncService::from(F::new_light(node_config).expect("Error creating test node service"));
executor.spawn(service.clone());
executor.spawn(service.clone().map_err(|_| ()));
let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into()));
(index, service, addr)
}));
@@ -272,8 +272,8 @@ impl<F: ServiceFactory> TestNet<F> where
}
pub fn connectivity<F: ServiceFactory>(spec: FactoryChainSpec<F>) where
F::FullService: Future<Item=(), Error=()>,
F::LightService: Future<Item=(), Error=()>,
F::FullService: Future<Item=(), Error=service::Error>,
F::LightService: Future<Item=(), Error=service::Error>,
{
const NUM_FULL_NODES: usize = 5;
const NUM_LIGHT_NODES: usize = 5;
@@ -347,8 +347,8 @@ pub fn connectivity<F: ServiceFactory>(spec: FactoryChainSpec<F>) where
pub fn sync<F, B, E>(spec: FactoryChainSpec<F>, mut block_factory: B, mut extrinsic_factory: E) where
F: ServiceFactory,
F::FullService: Future<Item=(), Error=()>,
F::LightService: Future<Item=(), Error=()>,
F::FullService: Future<Item=(), Error=service::Error>,
F::LightService: Future<Item=(), Error=service::Error>,
B: FnMut(&SyncService<F::FullService>) -> BlockImportParams<F::Block>,
E: FnMut(&SyncService<F::FullService>) -> FactoryExtrinsic<F>,
{
@@ -406,8 +406,8 @@ pub fn sync<F, B, E>(spec: FactoryChainSpec<F>, mut block_factory: B, mut extrin
pub fn consensus<F>(spec: FactoryChainSpec<F>, authorities: Vec<String>) where
F: ServiceFactory,
F::FullService: Future<Item=(), Error=()>,
F::LightService: Future<Item=(), Error=()>,
F::FullService: Future<Item=(), Error=service::Error>,
F::LightService: Future<Item=(), Error=service::Error>,
{
const NUM_FULL_NODES: usize = 10;
const NUM_LIGHT_NODES: usize = 0;
+16 -7
View File
@@ -59,11 +59,11 @@ fn run_until_exit<T, C, E>(
mut runtime: Runtime,
service: T,
e: E,
) -> error::Result<()>
where
T: Deref<Target=substrate_service::Service<C>> + Future<Item = (), Error = ()> + Send + 'static,
C: substrate_service::Components,
E: IntoExit,
) -> error::Result<()> where
T: Deref<Target=substrate_service::Service<C>>,
T: Future<Item = (), Error = substrate_service::error::Error> + Send + 'static,
C: substrate_service::Components,
E: IntoExit,
{
let (exit_send, exit) = exit_future::signal();
@@ -74,10 +74,19 @@ fn run_until_exit<T, C, E>(
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();
let _ = runtime.block_on(service.select(e.into_exit()));
let service_res = {
let exit = e.into_exit().map_err(|_| error::Error::Other("Exit future failed.".into()));
let service = service.map_err(|err| error::Error::Service(err));
let select = service.select(exit).map(|_| ()).map_err(|(err, _)| err);
runtime.block_on(select)
};
exit_send.fire();
Ok(())
// TODO [andre]: timeout this future #1318
let _ = runtime.shutdown_on_idle().wait();
service_res
}
// handles ctrl-c
+1 -1
View File
@@ -21,7 +21,7 @@ fn main() {
};
if let Err(e) = cli::run(::std::env::args(), cli::Exit, version) {
eprintln!("Error starting the node: {}\n\n{:?}", e, e);
eprintln!("Fatal error: {}\n\n{:?}", e, e);
std::process::exit(1)
}
}
+13 -7
View File
@@ -223,11 +223,11 @@ fn run_until_exit<T, C, E>(
mut runtime: Runtime,
service: T,
e: E,
) -> error::Result<()>
where
T: Deref<Target=substrate_service::Service<C>> + Future<Item = (), Error = ()> + Send + 'static,
C: substrate_service::Components,
E: IntoExit,
) -> error::Result<()> where
T: Deref<Target=substrate_service::Service<C>>,
T: Future<Item = (), Error = substrate_service::error::Error> + Send + 'static,
C: substrate_service::Components,
E: IntoExit,
{
let (exit_send, exit) = exit_future::signal();
@@ -238,11 +238,17 @@ fn run_until_exit<T, C, E>(
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();
let _ = runtime.block_on(service.select(e.into_exit()));
let service_res = {
let exit = e.into_exit().map_err(|_| error::Error::Other("Exit future failed.".into()));
let service = service.map_err(|err| error::Error::Service(err));
let select = service.select(exit).map(|_| ()).map_err(|(err, _)| err);
runtime.block_on(select)
};
exit_send.fire();
// TODO [andre]: timeout this future #1318
let _ = runtime.shutdown_on_idle().wait();
Ok(())
service_res
}
+8 -2
View File
@@ -152,7 +152,10 @@ construct_service_factory! {
let babe = start_babe(babe_config)?;
let select = babe.select(service.on_exit()).then(|_| Ok(()));
service.spawn_task(Box::new(select));
// the BABE authoring task is considered infallible, i.e. if it
// fails we take down the service with it.
service.spawn_essential_task(select);
}
let config = grandpa::Config {
@@ -187,7 +190,10 @@ construct_service_factory! {
on_exit: service.on_exit(),
telemetry_on_connect: Some(telemetry_on_connect),
};
service.spawn_task(Box::new(grandpa::run_grandpa_voter(grandpa_config)?));
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?);
},
(_, true) => {
grandpa::setup_disabled_grandpa(
+1 -1
View File
@@ -55,7 +55,7 @@ fn main() {
};
if let Err(e) = cli::run(::std::env::args(), Exit, version) {
eprintln!("Error starting the node: {}\n\n{:?}", e, e);
eprintln!("Fatal error: {}\n\n{:?}", e, e);
std::process::exit(1)
}
}