From e95be77eb66208555950703115b59d001646c504 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 20 Dec 2020 15:30:02 -0500 Subject: [PATCH] overseer: observe stalled subsystems and shut down (#2148) * overseer: observe stalled subsystems and shut down * notify on send_message failure as well --- polkadot/node/overseer/src/lib.rs | 77 +++++++++++++++++++----------- polkadot/node/subsystem/src/lib.rs | 3 ++ 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index d802ed451d..9e5ff2a754 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -90,7 +90,7 @@ pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem, }; -use polkadot_node_subsystem_util::metrics::{self, prometheus}; +use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}}; use polkadot_node_primitives::SpawnNamed; @@ -289,6 +289,7 @@ impl Debug for ToOverseer { /// [`Subsystem`]: trait.Subsystem.html struct SubsystemInstance { tx: mpsc::Sender>, + name: &'static str, } /// A context type that is given to the [`Subsystem`] upon spawning. @@ -389,22 +390,41 @@ impl OverseenSubsystem { /// /// If the inner `instance` is `None`, nothing is happening. async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { - if let Some(ref mut instance) = self.instance { - instance.tx.send(FromOverseer::Communication { msg }).await?; - } + const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); - Ok(()) + if let Some(ref mut instance) = self.instance { + match instance.tx.send( + FromOverseer::Communication { msg } + ).timeout(MESSAGE_TIMEOUT).await + { + None => { + tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + Err(SubsystemError::SubsystemStalled(instance.name)) + } + Some(res) => res.map_err(Into::into), + } + } else { + Ok(()) + } } /// Send a signal to the wrapped subsystem. /// /// If the inner `instance` is `None`, nothing is happening. async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { - if let Some(ref mut instance) = self.instance { - instance.tx.send(FromOverseer::Signal(signal)).await?; - } + const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10); - Ok(()) + if let Some(ref mut instance) = self.instance { + match instance.tx.send(FromOverseer::Signal(signal)).timeout(SIGNAL_TIMEOUT).await { + None => { + tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + Err(SubsystemError::SubsystemStalled(instance.name)) + } + Some(res) => res.map_err(Into::into), + } + } else { + Ok(()) + } } } @@ -1319,7 +1339,7 @@ where match msg { Event::MsgToSubsystem(msg) => { - self.route_message(msg).await; + self.route_message(msg).await?; } Event::Stop => { self.stop().await; @@ -1344,7 +1364,7 @@ where }; match msg { - ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await, + ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?, ToOverseer::SpawnJob { name, s } => { self.spawn_job(name, s); } @@ -1445,55 +1465,57 @@ where } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] - async fn route_message(&mut self, msg: AllMessages) { + async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { - let _ = self.candidate_validation_subsystem.send_message(msg).await; + self.candidate_validation_subsystem.send_message(msg).await?; }, AllMessages::CandidateBacking(msg) => { - let _ = self.candidate_backing_subsystem.send_message(msg).await; + self.candidate_backing_subsystem.send_message(msg).await?; }, AllMessages::CandidateSelection(msg) => { - let _ = self.candidate_selection_subsystem.send_message(msg).await; + self.candidate_selection_subsystem.send_message(msg).await?; }, AllMessages::StatementDistribution(msg) => { - let _ = self.statement_distribution_subsystem.send_message(msg).await; + self.statement_distribution_subsystem.send_message(msg).await?; }, AllMessages::AvailabilityDistribution(msg) => { - let _ = self.availability_distribution_subsystem.send_message(msg).await; + self.availability_distribution_subsystem.send_message(msg).await?; }, AllMessages::BitfieldDistribution(msg) => { - let _ = self.bitfield_distribution_subsystem.send_message(msg).await; + self.bitfield_distribution_subsystem.send_message(msg).await?; }, AllMessages::BitfieldSigning(msg) => { - let _ = self.bitfield_signing_subsystem.send_message(msg).await; + self.bitfield_signing_subsystem.send_message(msg).await?; }, AllMessages::Provisioner(msg) => { - let _ = self.provisioner_subsystem.send_message(msg).await; + self.provisioner_subsystem.send_message(msg).await?; }, AllMessages::PoVDistribution(msg) => { - let _ = self.pov_distribution_subsystem.send_message(msg).await; + self.pov_distribution_subsystem.send_message(msg).await?; }, AllMessages::RuntimeApi(msg) => { - let _ = self.runtime_api_subsystem.send_message(msg).await; + self.runtime_api_subsystem.send_message(msg).await?; }, AllMessages::AvailabilityStore(msg) => { - let _ = self.availability_store_subsystem.send_message(msg).await; + self.availability_store_subsystem.send_message(msg).await?; }, AllMessages::NetworkBridge(msg) => { - let _ = self.network_bridge_subsystem.send_message(msg).await; + self.network_bridge_subsystem.send_message(msg).await?; }, AllMessages::ChainApi(msg) => { - let _ = self.chain_api_subsystem.send_message(msg).await; + self.chain_api_subsystem.send_message(msg).await?; }, AllMessages::CollationGeneration(msg) => { - let _ = self.collation_generation_subsystem.send_message(msg).await; + self.collation_generation_subsystem.send_message(msg).await?; }, AllMessages::CollatorProtocol(msg) => { - let _ = self.collator_protocol_subsystem.send_message(msg).await; + self.collator_protocol_subsystem.send_message(msg).await?; }, } + + Ok(()) } #[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))] @@ -1577,6 +1599,7 @@ fn spawn( let instance = Some(SubsystemInstance { tx: to_tx, + name, }); Ok(OverseenSubsystem { diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs index 8c0fa1fecf..83b72111d4 100644 --- a/polkadot/node/subsystem/src/lib.rs +++ b/polkadot/node/subsystem/src/lib.rs @@ -144,6 +144,9 @@ pub enum SubsystemError { #[error("Failed to {0}")] Context(String), + #[error("Subsystem stalled: {0}")] + SubsystemStalled(&'static str), + /// Per origin (or subsystem) annotations to wrap an error. #[error("Error originated in {origin}")] FromOrigin {