mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 00:41:08 +00:00
Change SpawnedSubsystem type to log subsystem errors (#1878)
* Change SpawnedSubsystem type to log subsystem errors * Remove clone
This commit is contained in:
@@ -165,7 +165,10 @@ where
|
||||
Context: SubsystemContext<Message = CollationGenerationMessage>,
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = Box::pin(self.run(ctx));
|
||||
let future = Box::pin(async move {
|
||||
self.run(ctx).await;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
SpawnedSubsystem {
|
||||
name: "collation-generation-subsystem",
|
||||
|
||||
@@ -27,7 +27,7 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};
|
||||
|
||||
use codec::{Encode, Decode};
|
||||
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt};
|
||||
use futures::{select, channel::oneshot, future::{self, Either}, Future, FutureExt, TryFutureExt};
|
||||
use futures_timer::Delay;
|
||||
use kvdb_rocksdb::{Database, DatabaseConfig};
|
||||
use kvdb::{KeyValueDB, DBTransaction};
|
||||
@@ -969,9 +969,7 @@ where
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = run(self, ctx)
|
||||
.map(|r| if let Err(e) = r {
|
||||
log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e);
|
||||
})
|
||||
.map_err(|e| SubsystemError::with_origin("availability-store", e))
|
||||
.boxed();
|
||||
|
||||
SpawnedSubsystem {
|
||||
|
||||
@@ -121,7 +121,6 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
|
||||
fn start(self, ctx: C) -> SpawnedSubsystem {
|
||||
let future = run(ctx, self.spawn, self.metrics)
|
||||
.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
|
||||
.map(|_| ())
|
||||
.boxed();
|
||||
SpawnedSubsystem {
|
||||
name: "candidate-validation-subsystem",
|
||||
|
||||
@@ -67,7 +67,6 @@ impl<Client, Context> Subsystem<Context> for ChainApiSubsystem<Client> where
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = run(ctx, self)
|
||||
.map_err(|e| SubsystemError::with_origin("chain-api", e))
|
||||
.map(|_| ())
|
||||
.boxed();
|
||||
SpawnedSubsystem {
|
||||
future,
|
||||
|
||||
@@ -60,7 +60,7 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
SpawnedSubsystem {
|
||||
future: run(ctx, self).map(|_| ()).boxed(),
|
||||
future: run(ctx, self).boxed(),
|
||||
name: "runtime-api-subsystem",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -823,7 +823,6 @@ where
|
||||
let future = self
|
||||
.run(ctx)
|
||||
.map_err(|e| SubsystemError::with_origin("availability-distribution", e))
|
||||
.map(|_| ())
|
||||
.boxed();
|
||||
|
||||
SpawnedSubsystem {
|
||||
|
||||
@@ -585,7 +585,7 @@ where
|
||||
.map_err(|e| {
|
||||
SubsystemError::with_origin("bitfield-distribution", e)
|
||||
})
|
||||
.map(|_| ()).boxed();
|
||||
.boxed();
|
||||
|
||||
SpawnedSubsystem {
|
||||
name: "bitfield-distribution-subsystem",
|
||||
|
||||
@@ -231,7 +231,6 @@ impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
|
||||
.map_err(|e| {
|
||||
SubsystemError::with_origin("network-bridge", e)
|
||||
})
|
||||
.map(|_| ())
|
||||
.boxed();
|
||||
SpawnedSubsystem {
|
||||
name: "network-bridge-subsystem",
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
#![deny(missing_docs, unused_crate_dependencies)]
|
||||
|
||||
use std::time::Duration;
|
||||
use futures::{channel::oneshot, FutureExt};
|
||||
use futures::{channel::oneshot, FutureExt, TryFutureExt};
|
||||
use log::trace;
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -122,9 +122,14 @@ where
|
||||
Context: SubsystemContext<Message = CollatorProtocolMessage> + Sync + Send,
|
||||
{
|
||||
fn start(self, ctx: Context) -> SpawnedSubsystem {
|
||||
let future = self
|
||||
.run(ctx)
|
||||
.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
|
||||
.boxed();
|
||||
|
||||
SpawnedSubsystem {
|
||||
name: "collator-protocol-subsystem",
|
||||
future: self.run(ctx).map(|_| ()).boxed(),
|
||||
future,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,6 @@ impl<C> Subsystem<C> for PoVDistribution
|
||||
// within `run`.
|
||||
let future = self.run(ctx)
|
||||
.map_err(|e| SubsystemError::with_origin("pov-distribution", e))
|
||||
.map(|_| ())
|
||||
.boxed();
|
||||
SpawnedSubsystem {
|
||||
name: "pov-distribution-subsystem",
|
||||
@@ -616,4 +615,4 @@ impl metrics::Metrics for Metrics {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
mod tests;
|
||||
|
||||
@@ -81,7 +81,7 @@ impl<C> Subsystem<C> for StatementDistribution
|
||||
// within `run`.
|
||||
SpawnedSubsystem {
|
||||
name: "statement-distribution-subsystem",
|
||||
future: self.run(ctx).map(|_| ()).boxed(),
|
||||
future: self.run(ctx).boxed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ impl<C> Subsystem<C> for Subsystem1
|
||||
fn start(self, ctx: C) -> SpawnedSubsystem {
|
||||
let future = Box::pin(async move {
|
||||
Self::run(ctx).await;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
SpawnedSubsystem {
|
||||
@@ -121,6 +122,7 @@ impl<C> Subsystem<C> for Subsystem2
|
||||
fn start(self, ctx: C) -> SpawnedSubsystem {
|
||||
let future = Box::pin(async move {
|
||||
Self::run(ctx).await;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
SpawnedSubsystem {
|
||||
|
||||
@@ -1606,7 +1606,9 @@ fn spawn<S: SpawnNamed, M: Send + 'static>(
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let fut = Box::pin(async move {
|
||||
future.await;
|
||||
if let Err(e) = future.await {
|
||||
log::error!("Subsystem {} exited with error {:?}", name, e);
|
||||
}
|
||||
let _ = tx.send(());
|
||||
});
|
||||
|
||||
@@ -1658,8 +1660,8 @@ mod tests {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
|
||||
Err(_) => return,
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
|
||||
Err(_) => return Ok(()),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
@@ -1704,11 +1706,13 @@ mod tests {
|
||||
Ok(Some(_)) => {
|
||||
continue;
|
||||
}
|
||||
Err(_) => return,
|
||||
Err(_) => return Ok(()),
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -1724,6 +1728,7 @@ mod tests {
|
||||
name: "test-subsystem-4",
|
||||
future: Box::pin(async move {
|
||||
// Do nothing and exit.
|
||||
Ok(())
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -1902,11 +1907,13 @@ mod tests {
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
Err(_) => break,
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -1931,11 +1938,13 @@ mod tests {
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
Err(_) => break,
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -2180,6 +2189,8 @@ mod tests {
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -303,11 +303,11 @@ impl<C: SubsystemContext<Message = Msg>, Msg: Send + 'static> Subsystem<C> for F
|
||||
let future = Box::pin(async move {
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
|
||||
Ok(FromOverseer::Communication { msg }) => {
|
||||
let _ = self.0.send(msg).await;
|
||||
},
|
||||
Err(_) => return,
|
||||
Err(_) => return Ok(()),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -942,6 +942,7 @@ where
|
||||
|
||||
let future = Box::pin(async move {
|
||||
Self::run(ctx, run_args, metrics, spawner, errors).await;
|
||||
Ok(())
|
||||
});
|
||||
|
||||
SpawnedSubsystem {
|
||||
|
||||
@@ -164,7 +164,7 @@ pub struct SpawnedSubsystem {
|
||||
/// Name of the subsystem being spawned.
|
||||
pub name: &'static str,
|
||||
/// The task of the subsystem being spawned.
|
||||
pub future: BoxFuture<'static, ()>,
|
||||
pub future: BoxFuture<'static, SubsystemResult<()>>,
|
||||
}
|
||||
|
||||
/// A `Result` type that wraps [`SubsystemError`].
|
||||
@@ -233,8 +233,8 @@ impl<C: SubsystemContext> Subsystem<C> for DummySubsystem {
|
||||
let future = Box::pin(async move {
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
|
||||
Err(_) => return,
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
|
||||
Err(_) => return Ok(()),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user