mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 07:01:05 +00:00
Use SpawnNamed instead of Spawn in Overseer (#1430)
* Use SpawnNamed instead of Spawn in Overseer * reexport SpawnNamed and fix doc tests * Fix deps
This commit is contained in:
+128
-104
@@ -64,9 +64,8 @@ use std::collections::HashSet;
|
||||
use futures::channel::{mpsc, oneshot};
|
||||
use futures::{
|
||||
pending, poll, select,
|
||||
future::{BoxFuture, RemoteHandle},
|
||||
future::BoxFuture,
|
||||
stream::{self, FuturesUnordered},
|
||||
task::{Spawn, SpawnExt},
|
||||
Future, FutureExt, SinkExt, StreamExt,
|
||||
};
|
||||
use futures_timer::Delay;
|
||||
@@ -86,6 +85,7 @@ pub use polkadot_subsystem::{
|
||||
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
|
||||
SpawnedSubsystem,
|
||||
};
|
||||
use polkadot_node_primitives::SpawnNamed;
|
||||
|
||||
|
||||
// A capacity of bounded channels inside the overseer.
|
||||
@@ -109,8 +109,8 @@ enum ToOverseer {
|
||||
/// spawn on the overseer and a `oneshot::Sender` to signal the result
|
||||
/// of the spawn.
|
||||
SpawnJob {
|
||||
name: &'static str,
|
||||
s: BoxFuture<'static, ()>,
|
||||
res: oneshot::Sender<SubsystemResult<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -279,14 +279,15 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
|
||||
self.rx.next().await.ok_or(SubsystemError)
|
||||
}
|
||||
|
||||
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
|
||||
-> SubsystemResult<()>
|
||||
{
|
||||
self.tx.send(ToOverseer::SpawnJob {
|
||||
name,
|
||||
s,
|
||||
res: tx,
|
||||
}).await?;
|
||||
|
||||
rx.await?
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
|
||||
@@ -322,7 +323,7 @@ struct OverseenSubsystem<M> {
|
||||
}
|
||||
|
||||
/// The `Overseer` itself.
|
||||
pub struct Overseer<S: Spawn> {
|
||||
pub struct Overseer<S: SpawnNamed> {
|
||||
/// A candidate validation subsystem.
|
||||
candidate_validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
|
||||
|
||||
@@ -361,7 +362,7 @@ pub struct Overseer<S: Spawn> {
|
||||
s: S,
|
||||
|
||||
/// Here we keep handles to spawned subsystems to be notified when they terminate.
|
||||
running_subsystems: FuturesUnordered<RemoteHandle<()>>,
|
||||
running_subsystems: FuturesUnordered<BoxFuture<'static, ()>>,
|
||||
|
||||
/// Gather running subsystms' outbound streams into one.
|
||||
running_subsystems_rx: StreamUnordered<mpsc::Receiver<ToOverseer>>,
|
||||
@@ -416,7 +417,7 @@ pub struct AllSubsystems<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB> {
|
||||
|
||||
impl<S> Overseer<S>
|
||||
where
|
||||
S: Spawn,
|
||||
S: SpawnNamed,
|
||||
{
|
||||
/// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s.
|
||||
///
|
||||
@@ -467,16 +468,19 @@ where
|
||||
/// self,
|
||||
/// mut ctx: C,
|
||||
/// ) -> SpawnedSubsystem {
|
||||
/// SpawnedSubsystem(Box::pin(async move {
|
||||
/// loop {
|
||||
/// Delay::new(Duration::from_secs(1)).await;
|
||||
/// }
|
||||
/// }))
|
||||
/// SpawnedSubsystem {
|
||||
/// name: "validation-subsystem",
|
||||
/// future: Box::pin(async move {
|
||||
/// loop {
|
||||
/// Delay::new(Duration::from_secs(1)).await;
|
||||
/// }
|
||||
/// }),
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// # fn main() { executor::block_on(async move {
|
||||
/// let spawner = executor::ThreadPool::new().unwrap();
|
||||
/// let spawner = sp_core::testing::SpawnBlockingExecutor::new();
|
||||
/// let all_subsystems = AllSubsystems {
|
||||
/// candidate_validation: ValidationSubsystem,
|
||||
/// candidate_backing: DummySubsystem,
|
||||
@@ -737,10 +741,8 @@ where
|
||||
) {
|
||||
match msg {
|
||||
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
|
||||
ToOverseer::SpawnJob { s, res } => {
|
||||
let s = self.spawn_job(s);
|
||||
|
||||
let _ = res.send(s);
|
||||
ToOverseer::SpawnJob { name, s } => {
|
||||
self.spawn_job(name, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -897,26 +899,33 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
|
||||
self.s.spawn(j).map_err(|_| SubsystemError)
|
||||
fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
|
||||
self.s.spawn(name, j);
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn<S: Spawn, M: Send + 'static>(
|
||||
fn spawn<S: SpawnNamed, M: Send + 'static>(
|
||||
spawner: &mut S,
|
||||
futures: &mut FuturesUnordered<RemoteHandle<()>>,
|
||||
futures: &mut FuturesUnordered<BoxFuture<'static, ()>>,
|
||||
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
|
||||
s: impl Subsystem<OverseerSubsystemContext<M>>,
|
||||
) -> SubsystemResult<OverseenSubsystem<M>> {
|
||||
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
|
||||
let f = s.start(ctx);
|
||||
let SpawnedSubsystem { future, name } = s.start(ctx);
|
||||
|
||||
let handle = spawner.spawn_with_handle(f.0)?;
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let fut = Box::pin(async move {
|
||||
future.await;
|
||||
let _ = tx.send(());
|
||||
});
|
||||
|
||||
spawner.spawn(name, fut);
|
||||
|
||||
streams.push(from_rx);
|
||||
futures.push(handle);
|
||||
futures.push(Box::pin(rx.map(|_| ())));
|
||||
|
||||
let instance = Some(SubsystemInstance {
|
||||
tx: to_tx,
|
||||
@@ -944,21 +953,24 @@ mod tests {
|
||||
{
|
||||
fn start(self, mut ctx: C) -> SpawnedSubsystem {
|
||||
let mut sender = self.0;
|
||||
SpawnedSubsystem(Box::pin(async move {
|
||||
let mut i = 0;
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Ok(FromOverseer::Communication { .. }) => {
|
||||
let _ = sender.send(i).await;
|
||||
i += 1;
|
||||
continue;
|
||||
SpawnedSubsystem {
|
||||
name: "test-subsystem-1",
|
||||
future: Box::pin(async move {
|
||||
let mut i = 0;
|
||||
loop {
|
||||
match ctx.recv().await {
|
||||
Ok(FromOverseer::Communication { .. }) => {
|
||||
let _ = sender.send(i).await;
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}))
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -969,39 +981,42 @@ mod tests {
|
||||
{
|
||||
fn start(self, mut ctx: C) -> SpawnedSubsystem {
|
||||
let sender = self.0.clone();
|
||||
SpawnedSubsystem(Box::pin(async move {
|
||||
let _sender = sender;
|
||||
let mut c: usize = 0;
|
||||
loop {
|
||||
if c < 10 {
|
||||
let (tx, _) = oneshot::channel();
|
||||
ctx.send_message(
|
||||
AllMessages::CandidateValidation(
|
||||
CandidateValidationMessage::ValidateFromChainState(
|
||||
Default::default(),
|
||||
PoV {
|
||||
block_data: BlockData(Vec::new()),
|
||||
}.into(),
|
||||
tx,
|
||||
SpawnedSubsystem {
|
||||
name: "test-subsystem-2",
|
||||
future: Box::pin(async move {
|
||||
let _sender = sender;
|
||||
let mut c: usize = 0;
|
||||
loop {
|
||||
if c < 10 {
|
||||
let (tx, _) = oneshot::channel();
|
||||
ctx.send_message(
|
||||
AllMessages::CandidateValidation(
|
||||
CandidateValidationMessage::ValidateFromChainState(
|
||||
Default::default(),
|
||||
PoV {
|
||||
block_data: BlockData(Vec::new()),
|
||||
}.into(),
|
||||
tx,
|
||||
)
|
||||
)
|
||||
)
|
||||
).await.unwrap();
|
||||
c += 1;
|
||||
continue;
|
||||
}
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
|
||||
break;
|
||||
}
|
||||
Ok(Some(_)) => {
|
||||
).await.unwrap();
|
||||
c += 1;
|
||||
continue;
|
||||
}
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
|
||||
break;
|
||||
}
|
||||
Ok(Some(_)) => {
|
||||
continue;
|
||||
}
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
}))
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1011,16 +1026,19 @@ mod tests {
|
||||
where C: SubsystemContext<Message=CandidateBackingMessage>
|
||||
{
|
||||
fn start(self, mut _ctx: C) -> SpawnedSubsystem {
|
||||
SpawnedSubsystem(Box::pin(async move {
|
||||
// Do nothing and exit.
|
||||
}))
|
||||
SpawnedSubsystem {
|
||||
name: "test-subsystem-4",
|
||||
future: Box::pin(async move {
|
||||
// Do nothing and exit.
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Checks that a minimal configuration of two jobs can run and exchange messages.
|
||||
#[test]
|
||||
fn overseer_works() {
|
||||
let spawner = executor::ThreadPool::new().unwrap();
|
||||
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
|
||||
|
||||
executor::block_on(async move {
|
||||
let (s1_tx, mut s1_rx) = mpsc::channel(64);
|
||||
@@ -1084,7 +1102,7 @@ mod tests {
|
||||
// Should immediately conclude the overseer itself with an error.
|
||||
#[test]
|
||||
fn overseer_panics_on_subsystem_exit() {
|
||||
let spawner = executor::ThreadPool::new().unwrap();
|
||||
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
|
||||
|
||||
executor::block_on(async move {
|
||||
let (s1_tx, _) = mpsc::channel(64);
|
||||
@@ -1124,21 +1142,24 @@ mod tests {
|
||||
fn start(self, mut ctx: C) -> SpawnedSubsystem {
|
||||
let mut sender = self.0.clone();
|
||||
|
||||
SpawnedSubsystem(Box::pin(async move {
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
|
||||
Ok(Some(FromOverseer::Signal(s))) => {
|
||||
sender.send(s).await.unwrap();
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
SpawnedSubsystem {
|
||||
name: "test-subsystem-5",
|
||||
future: Box::pin(async move {
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
|
||||
Ok(Some(FromOverseer::Signal(s))) => {
|
||||
sender.send(s).await.unwrap();
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
}))
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1150,21 +1171,24 @@ mod tests {
|
||||
fn start(self, mut ctx: C) -> SpawnedSubsystem {
|
||||
let mut sender = self.0.clone();
|
||||
|
||||
SpawnedSubsystem(Box::pin(async move {
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
|
||||
Ok(Some(FromOverseer::Signal(s))) => {
|
||||
sender.send(s).await.unwrap();
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
SpawnedSubsystem {
|
||||
name: "test-subsystem-6",
|
||||
future: Box::pin(async move {
|
||||
loop {
|
||||
match ctx.try_recv().await {
|
||||
Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
|
||||
Ok(Some(FromOverseer::Signal(s))) => {
|
||||
sender.send(s).await.unwrap();
|
||||
continue;
|
||||
},
|
||||
Ok(Some(_)) => continue,
|
||||
Err(_) => return,
|
||||
_ => (),
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
pending!();
|
||||
}
|
||||
}))
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1172,7 +1196,7 @@ mod tests {
|
||||
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
|
||||
#[test]
|
||||
fn overseer_start_stop_works() {
|
||||
let spawner = executor::ThreadPool::new().unwrap();
|
||||
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
|
||||
|
||||
executor::block_on(async move {
|
||||
let first_block_hash = [1; 32].into();
|
||||
@@ -1267,7 +1291,7 @@ mod tests {
|
||||
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
|
||||
#[test]
|
||||
fn overseer_finalize_works() {
|
||||
let spawner = executor::ThreadPool::new().unwrap();
|
||||
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
|
||||
|
||||
executor::block_on(async move {
|
||||
let first_block_hash = [1; 32].into();
|
||||
|
||||
Reference in New Issue
Block a user