overseer: observe stalled subsystems and shut down (#2148)

* overseer: observe stalled subsystems and shut down

* notify on send_message failure as well
This commit is contained in:
Robert Habermeier
2020-12-20 15:30:02 -05:00
committed by GitHub
parent 3e7b3ff4b5
commit e95be77eb6
2 changed files with 53 additions and 27 deletions
+50 -27
View File
@@ -90,7 +90,7 @@ pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate, DummySubsystem,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_subsystem_util::{TimeoutExt, metrics::{self, prometheus}};
use polkadot_node_primitives::SpawnNamed;
@@ -289,6 +289,7 @@ impl Debug for ToOverseer {
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M> {
tx: mpsc::Sender<FromOverseer<M>>,
name: &'static str,
}
/// A context type that is given to the [`Subsystem`] upon spawning.
@@ -389,22 +390,41 @@ impl<M> OverseenSubsystem<M> {
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Communication { msg }).await?;
}
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);
Ok(())
if let Some(ref mut instance) = self.instance {
match instance.tx.send(
FromOverseer::Communication { msg }
).timeout(MESSAGE_TIMEOUT).await
{
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => res.map_err(Into::into),
}
} else {
Ok(())
}
}
/// Send a signal to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
if let Some(ref mut instance) = self.instance {
instance.tx.send(FromOverseer::Signal(signal)).await?;
}
const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10);
Ok(())
if let Some(ref mut instance) = self.instance {
match instance.tx.send(FromOverseer::Signal(signal)).timeout(SIGNAL_TIMEOUT).await {
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => res.map_err(Into::into),
}
} else {
Ok(())
}
}
}
@@ -1319,7 +1339,7 @@ where
match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg).await;
self.route_message(msg).await?;
}
Event::Stop => {
self.stop().await;
@@ -1344,7 +1364,7 @@ where
};
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await?,
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
}
@@ -1445,55 +1465,57 @@ where
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn route_message(&mut self, msg: AllMessages) {
async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.metrics.on_message_relayed();
match msg {
AllMessages::CandidateValidation(msg) => {
let _ = self.candidate_validation_subsystem.send_message(msg).await;
self.candidate_validation_subsystem.send_message(msg).await?;
},
AllMessages::CandidateBacking(msg) => {
let _ = self.candidate_backing_subsystem.send_message(msg).await;
self.candidate_backing_subsystem.send_message(msg).await?;
},
AllMessages::CandidateSelection(msg) => {
let _ = self.candidate_selection_subsystem.send_message(msg).await;
self.candidate_selection_subsystem.send_message(msg).await?;
},
AllMessages::StatementDistribution(msg) => {
let _ = self.statement_distribution_subsystem.send_message(msg).await;
self.statement_distribution_subsystem.send_message(msg).await?;
},
AllMessages::AvailabilityDistribution(msg) => {
let _ = self.availability_distribution_subsystem.send_message(msg).await;
self.availability_distribution_subsystem.send_message(msg).await?;
},
AllMessages::BitfieldDistribution(msg) => {
let _ = self.bitfield_distribution_subsystem.send_message(msg).await;
self.bitfield_distribution_subsystem.send_message(msg).await?;
},
AllMessages::BitfieldSigning(msg) => {
let _ = self.bitfield_signing_subsystem.send_message(msg).await;
self.bitfield_signing_subsystem.send_message(msg).await?;
},
AllMessages::Provisioner(msg) => {
let _ = self.provisioner_subsystem.send_message(msg).await;
self.provisioner_subsystem.send_message(msg).await?;
},
AllMessages::PoVDistribution(msg) => {
let _ = self.pov_distribution_subsystem.send_message(msg).await;
self.pov_distribution_subsystem.send_message(msg).await?;
},
AllMessages::RuntimeApi(msg) => {
let _ = self.runtime_api_subsystem.send_message(msg).await;
self.runtime_api_subsystem.send_message(msg).await?;
},
AllMessages::AvailabilityStore(msg) => {
let _ = self.availability_store_subsystem.send_message(msg).await;
self.availability_store_subsystem.send_message(msg).await?;
},
AllMessages::NetworkBridge(msg) => {
let _ = self.network_bridge_subsystem.send_message(msg).await;
self.network_bridge_subsystem.send_message(msg).await?;
},
AllMessages::ChainApi(msg) => {
let _ = self.chain_api_subsystem.send_message(msg).await;
self.chain_api_subsystem.send_message(msg).await?;
},
AllMessages::CollationGeneration(msg) => {
let _ = self.collation_generation_subsystem.send_message(msg).await;
self.collation_generation_subsystem.send_message(msg).await?;
},
AllMessages::CollatorProtocol(msg) => {
let _ = self.collator_protocol_subsystem.send_message(msg).await;
self.collator_protocol_subsystem.send_message(msg).await?;
},
}
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
@@ -1577,6 +1599,7 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
let instance = Some(SubsystemInstance {
tx: to_tx,
name,
});
Ok(OverseenSubsystem {
+3
View File
@@ -144,6 +144,9 @@ pub enum SubsystemError {
#[error("Failed to {0}")]
Context(String),
#[error("Subsystem stalled: {0}")]
SubsystemStalled(&'static str),
/// Per origin (or subsystem) annotations to wrap an error.
#[error("Error originated in {origin}")]
FromOrigin {