diff --git a/substrate/core/cli/src/error.rs b/substrate/core/cli/src/error.rs index b052a29710..600e73d44f 100644 --- a/substrate/core/cli/src/error.rs +++ b/substrate/core/cli/src/error.rs @@ -36,7 +36,17 @@ pub enum Error { Input(String), /// Invalid listen multiaddress #[display(fmt="Invalid listen multiaddress")] - InvalidListenMultiaddress + InvalidListenMultiaddress, + /// Other uncategorized error. + Other(String), +} + +/// Must be implemented explicitly because `derive_more` won't generate this +/// case due to conflicting derive for `Other(String)`. +impl std::convert::From for Error { + fn from(s: String) -> Error { + Error::Input(s) + } } impl std::error::Error for Error { @@ -48,6 +58,7 @@ impl std::error::Error for Error { Error::Client(ref err) => Some(err), Error::Input(_) => None, Error::InvalidListenMultiaddress => None, + Error::Other(_) => None, } } } diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index f5010debdc..5c6835e3bb 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -53,7 +53,7 @@ //! included in the newly-finalized chain. use futures::prelude::*; -use log::{debug, info, warn}; +use log::{debug, error, info}; use futures::sync::mpsc; use client::{ BlockchainEvents, CallExecutor, Client, backend::Backend, error::Error as ClientError, @@ -680,8 +680,8 @@ pub fn run_grandpa_voter, N, RA, SC, X>( poll_voter.select2(voter_commands_rx).then(move |res| match res { Ok(future::Either::A(((), _))) => { - // voters don't conclude naturally; this could reasonably be an error. - Ok(FutureLoop::Break(())) + // voters don't conclude naturally + Err(Error::Safety("GRANDPA voter has concluded.".into())) }, Err(future::Either::B(_)) => { // the `voter_commands_rx` stream should not fail. @@ -709,7 +709,7 @@ pub fn run_grandpa_voter, N, RA, SC, X>( let voter_work = voter_work .map(|_| ()) .map_err(|e| { - warn!("GRANDPA Voter failed: {:?}", e); + error!("GRANDPA Voter failed: {:?}", e); telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); }); diff --git a/substrate/core/service/src/components.rs b/substrate/core/service/src/components.rs index 8140b984dc..3c566b5974 100644 --- a/substrate/core/service/src/components.rs +++ b/substrate/core/service/src/components.rs @@ -502,7 +502,7 @@ impl DerefMut for FullComponents { impl Future for FullComponents { type Item = (); - type Error = (); + type Error = super::Error; fn poll(&mut self) -> Poll { self.service.poll() @@ -627,7 +627,7 @@ impl DerefMut for LightComponents { impl Future for LightComponents { type Item = (); - type Error = (); + type Error = super::Error; fn poll(&mut self) -> Poll { self.service.poll() diff --git a/substrate/core/service/src/lib.rs b/substrate/core/service/src/lib.rs index aece42145a..2b604fbc70 100644 --- a/substrate/core/service/src/lib.rs +++ b/substrate/core/service/src/lib.rs @@ -28,6 +28,7 @@ pub mod error; use std::io; use std::net::SocketAddr; use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use futures::sync::mpsc; use parking_lot::Mutex; @@ -82,8 +83,14 @@ pub struct Service { NetworkStatus>, NetworkState )>>>>, transaction_pool: Arc>, + /// A future that resolves when the service has exited, this is useful to + /// make sure any internally spawned futures stop when the service does. exit: exit_future::Exit, + /// A signal that makes the exit future above resolve, fired on service drop. signal: Option, + /// Set to `true` when a spawned essential task has failed. The next time + /// the service future is polled it should complete with an error. + essential_failed: Arc, /// Sender for futures that must be spawned as background tasks. to_spawn_tx: mpsc::UnboundedSender + Send>>, /// Receiver for futures that must be spawned as background tasks. @@ -395,7 +402,7 @@ impl Service { // Telemetry let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { - let is_authority = config.roles == Roles::AUTHORITY; + let is_authority = config.roles.is_authority(); let network_id = network.local_peer_id().to_base58(); let name = config.name.clone(); let impl_name = config.impl_name.to_owned(); @@ -440,12 +447,13 @@ impl Service { network_status_sinks, select_chain, transaction_pool, + exit, signal: Some(signal), + essential_failed: Arc::new(AtomicBool::new(false)), to_spawn_tx, to_spawn_rx, to_poll: Vec::new(), config, - exit, rpc_handlers, _rpc: rpc, _telemetry: telemetry, @@ -491,6 +499,19 @@ impl Service { let _ = self.to_spawn_tx.unbounded_send(Box::new(task)); } + /// Spawns a task in the background that runs the future passed as + /// parameter. The given task is considered essential, i.e. if it errors we + /// trigger a service exit. + pub fn spawn_essential_task(&self, task: impl Future + Send + 'static) { + let essential_failed = self.essential_failed.clone(); + let essential_task = Box::new(task.map_err(move |_| { + error!("Essential task failed. Shutting down service."); + essential_failed.store(true, Ordering::Relaxed); + })); + + let _ = self.to_spawn_tx.unbounded_send(essential_task); + } + /// Returns a handle for spawning tasks. pub fn spawn_task_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { @@ -548,9 +569,13 @@ impl Service { impl Future for Service where Components: components::Components { type Item = (); - type Error = (); + type Error = Error; fn poll(&mut self) -> Poll { + if self.essential_failed.load(Ordering::Relaxed) { + return Err(Error::Other("Essential task failed.".into())); + } + while let Ok(Async::Ready(Some(task_to_spawn))) = self.to_spawn_rx.poll() { let executor = tokio_executor::DefaultExecutor::current(); if let Err(err) = executor.execute(task_to_spawn) { diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index 851ed8e5ec..1b3c43dae7 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -73,9 +73,9 @@ impl From for SyncService { } } -impl> Future for SyncService { +impl> Future for SyncService { type Item = (); - type Error = (); + type Error = service::Error; fn poll(&mut self) -> Poll { self.0.lock().unwrap().poll() @@ -195,8 +195,8 @@ fn node_config ( } impl TestNet where - F::FullService: Future, - F::LightService: Future + F::FullService: Future, + F::LightService: Future, { fn new( temp: &TempDir, @@ -239,7 +239,7 @@ impl TestNet where let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service")); - executor.spawn(service.clone()); + executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); ((index + nodes), service, addr) })); @@ -250,7 +250,7 @@ impl TestNet where let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service")); - executor.spawn(service.clone()); + executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); (index, service, addr) })); @@ -261,7 +261,7 @@ impl TestNet where let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); let service = SyncService::from(F::new_light(node_config).expect("Error creating test node service")); - executor.spawn(service.clone()); + executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); (index, service, addr) })); @@ -272,8 +272,8 @@ impl TestNet where } pub fn connectivity(spec: FactoryChainSpec) where - F::FullService: Future, - F::LightService: Future, + F::FullService: Future, + F::LightService: Future, { const NUM_FULL_NODES: usize = 5; const NUM_LIGHT_NODES: usize = 5; @@ -347,8 +347,8 @@ pub fn connectivity(spec: FactoryChainSpec) where pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrinsic_factory: E) where F: ServiceFactory, - F::FullService: Future, - F::LightService: Future, + F::FullService: Future, + F::LightService: Future, B: FnMut(&SyncService) -> BlockImportParams, E: FnMut(&SyncService) -> FactoryExtrinsic, { @@ -406,8 +406,8 @@ pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrin pub fn consensus(spec: FactoryChainSpec, authorities: Vec) where F: ServiceFactory, - F::FullService: Future, - F::LightService: Future, + F::FullService: Future, + F::LightService: Future, { const NUM_FULL_NODES: usize = 10; const NUM_LIGHT_NODES: usize = 0; diff --git a/substrate/node-template/src/cli.rs b/substrate/node-template/src/cli.rs index 5f94afddbc..4d672491c1 100644 --- a/substrate/node-template/src/cli.rs +++ b/substrate/node-template/src/cli.rs @@ -59,11 +59,11 @@ fn run_until_exit( mut runtime: Runtime, service: T, e: E, -) -> error::Result<()> - where - T: Deref> + Future + Send + 'static, - C: substrate_service::Components, - E: IntoExit, +) -> error::Result<()> where + T: Deref>, + T: Future + Send + 'static, + C: substrate_service::Components, + E: IntoExit, { let (exit_send, exit) = exit_future::signal(); @@ -74,10 +74,19 @@ fn run_until_exit( // but we need to keep holding a reference to the global telemetry guard let _telemetry = service.telemetry(); - let _ = runtime.block_on(service.select(e.into_exit())); + let service_res = { + let exit = e.into_exit().map_err(|_| error::Error::Other("Exit future failed.".into())); + let service = service.map_err(|err| error::Error::Service(err)); + let select = service.select(exit).map(|_| ()).map_err(|(err, _)| err); + runtime.block_on(select) + }; + exit_send.fire(); - Ok(()) + // TODO [andre]: timeout this future #1318 + let _ = runtime.shutdown_on_idle().wait(); + + service_res } // handles ctrl-c diff --git a/substrate/node-template/src/main.rs b/substrate/node-template/src/main.rs index 5418453a02..18e9638833 100644 --- a/substrate/node-template/src/main.rs +++ b/substrate/node-template/src/main.rs @@ -21,7 +21,7 @@ fn main() { }; if let Err(e) = cli::run(::std::env::args(), cli::Exit, version) { - eprintln!("Error starting the node: {}\n\n{:?}", e, e); + eprintln!("Fatal error: {}\n\n{:?}", e, e); std::process::exit(1) } } diff --git a/substrate/node/cli/src/lib.rs b/substrate/node/cli/src/lib.rs index 6639c4ad0b..4e3cfa7f01 100644 --- a/substrate/node/cli/src/lib.rs +++ b/substrate/node/cli/src/lib.rs @@ -223,11 +223,11 @@ fn run_until_exit( mut runtime: Runtime, service: T, e: E, -) -> error::Result<()> - where - T: Deref> + Future + Send + 'static, - C: substrate_service::Components, - E: IntoExit, +) -> error::Result<()> where + T: Deref>, + T: Future + Send + 'static, + C: substrate_service::Components, + E: IntoExit, { let (exit_send, exit) = exit_future::signal(); @@ -238,11 +238,17 @@ fn run_until_exit( // but we need to keep holding a reference to the global telemetry guard let _telemetry = service.telemetry(); - let _ = runtime.block_on(service.select(e.into_exit())); + let service_res = { + let exit = e.into_exit().map_err(|_| error::Error::Other("Exit future failed.".into())); + let service = service.map_err(|err| error::Error::Service(err)); + let select = service.select(exit).map(|_| ()).map_err(|(err, _)| err); + runtime.block_on(select) + }; + exit_send.fire(); // TODO [andre]: timeout this future #1318 let _ = runtime.shutdown_on_idle().wait(); - Ok(()) + service_res } diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index 35f813efb3..0a041e94d3 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -152,7 +152,10 @@ construct_service_factory! { let babe = start_babe(babe_config)?; let select = babe.select(service.on_exit()).then(|_| Ok(())); - service.spawn_task(Box::new(select)); + + // the BABE authoring task is considered infallible, i.e. if it + // fails we take down the service with it. + service.spawn_essential_task(select); } let config = grandpa::Config { @@ -187,7 +190,10 @@ construct_service_factory! { on_exit: service.on_exit(), telemetry_on_connect: Some(telemetry_on_connect), }; - service.spawn_task(Box::new(grandpa::run_grandpa_voter(grandpa_config)?)); + + // the GRANDPA voter task is considered infallible, i.e. + // if it fails we take down the service with it. + service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?); }, (_, true) => { grandpa::setup_disabled_grandpa( diff --git a/substrate/node/src/main.rs b/substrate/node/src/main.rs index 15b603e7a2..ca4a6b4c60 100644 --- a/substrate/node/src/main.rs +++ b/substrate/node/src/main.rs @@ -55,7 +55,7 @@ fn main() { }; if let Err(e) = cli::run(::std::env::args(), Exit, version) { - eprintln!("Error starting the node: {}\n\n{:?}", e, e); + eprintln!("Fatal error: {}\n\n{:?}", e, e); std::process::exit(1) } }