Subsystem::start takes self by-value (#1325)

* Subsystem::start takes self by-value

* fix doc-test compilation
This commit is contained in:
Robert Habermeier
2020-06-30 15:16:37 -04:00
committed by GitHub
parent cb59245085
commit 2a3e607d14
5 changed files with 38 additions and 46 deletions
+6 -14
View File
@@ -164,7 +164,7 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
} }
/// The network bridge subsystem. /// The network bridge subsystem.
pub struct NetworkBridge<N>(Option<N>); pub struct NetworkBridge<N>(N);
impl<N> NetworkBridge<N> { impl<N> NetworkBridge<N> {
/// Create a new network bridge subsystem with underlying network service. /// Create a new network bridge subsystem with underlying network service.
@@ -172,7 +172,7 @@ impl<N> NetworkBridge<N> {
/// This assumes that the network service has had the notifications protocol for the network /// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info). /// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self { pub fn new(net_service: N) -> Self {
NetworkBridge(Some(net_service)) NetworkBridge(net_service)
} }
} }
@@ -181,18 +181,10 @@ impl<Net, Context> Subsystem<Context> for NetworkBridge<Net>
Net: Network, Net: Network,
Context: SubsystemContext<Message=NetworkBridgeMessage>, Context: SubsystemContext<Message=NetworkBridgeMessage>,
{ {
fn start(&mut self, mut ctx: Context) -> SpawnedSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem(match self.0.take() { // Swallow error because failure is fatal to the node and we log with more precision
None => async move { for _ in ctx.recv().await { } }.boxed(), // within `run_network`.
Some(net) => { SpawnedSubsystem(run_network(self.0, ctx).map(|_| ()).boxed())
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
run_network(net, ctx).map(|_| ()).boxed()
}
})
} }
} }
@@ -74,7 +74,7 @@ impl Subsystem1 {
impl<C> Subsystem<C> for Subsystem1 impl<C> Subsystem<C> for Subsystem1
where C: SubsystemContext<Message=CandidateBackingMessage> where C: SubsystemContext<Message=CandidateBackingMessage>
{ {
fn start(&mut self, ctx: C) -> SpawnedSubsystem { fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await; Self::run(ctx).await;
})) }))
@@ -111,7 +111,7 @@ impl Subsystem2 {
impl<C> Subsystem<C> for Subsystem2 impl<C> Subsystem<C> for Subsystem2
where C: SubsystemContext<Message=CandidateValidationMessage> where C: SubsystemContext<Message=CandidateValidationMessage>
{ {
fn start(&mut self, ctx: C) -> SpawnedSubsystem { fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await; Self::run(ctx).await;
})) }))
@@ -129,8 +129,8 @@ fn main() {
let (overseer, _handler) = Overseer::new( let (overseer, _handler) = Overseer::new(
vec![], vec![],
Box::new(Subsystem2), Subsystem2,
Box::new(Subsystem1), Subsystem1,
spawner, spawner,
).unwrap(); ).unwrap();
let overseer_fut = overseer.run().fuse(); let overseer_fut = overseer.run().fuse();
+23 -23
View File
@@ -314,7 +314,6 @@ pub type CompatibleSubsystem<M> = Box<dyn Subsystem<OverseerSubsystemContext<M>>
/// [`Subsystem`]: trait.Subsystem.html /// [`Subsystem`]: trait.Subsystem.html
#[allow(dead_code)] #[allow(dead_code)]
struct OverseenSubsystem<M> { struct OverseenSubsystem<M> {
subsystem: CompatibleSubsystem<M>,
instance: Option<SubsystemInstance<M>>, instance: Option<SubsystemInstance<M>>,
} }
@@ -407,7 +406,7 @@ where
/// where C: SubsystemContext<Message=CandidateValidationMessage> /// where C: SubsystemContext<Message=CandidateValidationMessage>
/// { /// {
/// fn start( /// fn start(
/// &mut self, /// self,
/// mut ctx: C, /// mut ctx: C,
/// ) -> SpawnedSubsystem { /// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move { /// SpawnedSubsystem(Box::pin(async move {
@@ -423,7 +422,7 @@ where
/// where C: SubsystemContext<Message=CandidateBackingMessage> /// where C: SubsystemContext<Message=CandidateBackingMessage>
/// { /// {
/// fn start( /// fn start(
/// &mut self, /// self,
/// mut ctx: C, /// mut ctx: C,
/// ) -> SpawnedSubsystem { /// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move { /// SpawnedSubsystem(Box::pin(async move {
@@ -438,8 +437,8 @@ where
/// let spawner = executor::ThreadPool::new().unwrap(); /// let spawner = executor::ThreadPool::new().unwrap();
/// let (overseer, _handler) = Overseer::new( /// let (overseer, _handler) = Overseer::new(
/// vec![], /// vec![],
/// Box::new(ValidationSubsystem), /// ValidationSubsystem,
/// Box::new(CandidateBackingSubsystem), /// CandidateBackingSubsystem,
/// spawner, /// spawner,
/// ).unwrap(); /// ).unwrap();
/// ///
@@ -458,8 +457,8 @@ where
/// ``` /// ```
pub fn new( pub fn new(
leaves: impl IntoIterator<Item = BlockInfo>, leaves: impl IntoIterator<Item = BlockInfo>,
validation: CompatibleSubsystem<CandidateValidationMessage>, validation: impl Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
candidate_backing: CompatibleSubsystem<CandidateBackingMessage>, candidate_backing: impl Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
mut s: S, mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> { ) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
@@ -658,7 +657,7 @@ fn spawn<S: Spawn, M: Send + 'static>(
spawner: &mut S, spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>, futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>, streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: CompatibleSubsystem<M>, s: impl Subsystem<OverseerSubsystemContext<M>>,
) -> SubsystemResult<OverseenSubsystem<M>> { ) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY); let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY); let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
@@ -675,7 +674,6 @@ fn spawn<S: Spawn, M: Send + 'static>(
}); });
Ok(OverseenSubsystem { Ok(OverseenSubsystem {
subsystem: s,
instance, instance,
}) })
} }
@@ -692,8 +690,8 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem1 impl<C> Subsystem<C> for TestSubsystem1
where C: SubsystemContext<Message=CandidateValidationMessage> where C: SubsystemContext<Message=CandidateValidationMessage>
{ {
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone(); let mut sender = self.0;
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
let mut i = 0; let mut i = 0;
loop { loop {
@@ -717,8 +715,10 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem2 impl<C> Subsystem<C> for TestSubsystem2
where C: SubsystemContext<Message=CandidateBackingMessage> where C: SubsystemContext<Message=CandidateBackingMessage>
{ {
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem {
let sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
let _sender = sender;
let mut c: usize = 0; let mut c: usize = 0;
loop { loop {
if c < 10 { if c < 10 {
@@ -759,7 +759,7 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem4 impl<C> Subsystem<C> for TestSubsystem4
where C: SubsystemContext<Message=CandidateBackingMessage> where C: SubsystemContext<Message=CandidateBackingMessage>
{ {
fn start(&mut self, mut _ctx: C) -> SpawnedSubsystem { fn start(self, mut _ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
// Do nothing and exit. // Do nothing and exit.
})) }))
@@ -777,8 +777,8 @@ mod tests {
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![], vec![],
Box::new(TestSubsystem1(s1_tx)), TestSubsystem1(s1_tx),
Box::new(TestSubsystem2(s2_tx)), TestSubsystem2(s2_tx),
spawner, spawner,
).unwrap(); ).unwrap();
let overseer_fut = overseer.run().fuse(); let overseer_fut = overseer.run().fuse();
@@ -827,8 +827,8 @@ mod tests {
let (s1_tx, _) = mpsc::channel(64); let (s1_tx, _) = mpsc::channel(64);
let (overseer, _handle) = Overseer::new( let (overseer, _handle) = Overseer::new(
vec![], vec![],
Box::new(TestSubsystem1(s1_tx)), TestSubsystem1(s1_tx),
Box::new(TestSubsystem4), TestSubsystem4,
spawner, spawner,
).unwrap(); ).unwrap();
let overseer_fut = overseer.run().fuse(); let overseer_fut = overseer.run().fuse();
@@ -846,7 +846,7 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem5 impl<C> Subsystem<C> for TestSubsystem5
where C: SubsystemContext<Message=CandidateValidationMessage> where C: SubsystemContext<Message=CandidateValidationMessage>
{ {
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone(); let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
@@ -872,7 +872,7 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem6 impl<C> Subsystem<C> for TestSubsystem6
where C: SubsystemContext<Message=CandidateBackingMessage> where C: SubsystemContext<Message=CandidateBackingMessage>
{ {
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone(); let mut sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
@@ -925,8 +925,8 @@ mod tests {
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![first_block], vec![first_block],
Box::new(TestSubsystem5(tx_5)), TestSubsystem5(tx_5),
Box::new(TestSubsystem6(tx_6)), TestSubsystem6(tx_6),
spawner, spawner,
).unwrap(); ).unwrap();
@@ -1010,8 +1010,8 @@ mod tests {
// start with two forks of different height. // start with two forks of different height.
let (overseer, mut handler) = Overseer::new( let (overseer, mut handler) = Overseer::new(
vec![first_block, second_block], vec![first_block, second_block],
Box::new(TestSubsystem5(tx_5)), TestSubsystem5(tx_5),
Box::new(TestSubsystem6(tx_6)), TestSubsystem6(tx_6),
spawner, spawner,
).unwrap(); ).unwrap();
+4 -4
View File
@@ -272,7 +272,7 @@ struct CandidateValidationSubsystem;
impl<C> Subsystem<C> for CandidateValidationSubsystem impl<C> Subsystem<C> for CandidateValidationSubsystem
where C: SubsystemContext<Message = CandidateValidationMessage> where C: SubsystemContext<Message = CandidateValidationMessage>
{ {
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {} while let Ok(_) = ctx.recv().await {}
})) }))
@@ -284,7 +284,7 @@ struct CandidateBackingSubsystem;
impl<C> Subsystem<C> for CandidateBackingSubsystem impl<C> Subsystem<C> for CandidateBackingSubsystem
where C: SubsystemContext<Message = CandidateBackingMessage> where C: SubsystemContext<Message = CandidateBackingMessage>
{ {
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem { fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move { SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {} while let Ok(_) = ctx.recv().await {}
})) }))
@@ -295,8 +295,8 @@ fn real_overseer<S: futures::task::Spawn>(
leaves: impl IntoIterator<Item = BlockInfo>, leaves: impl IntoIterator<Item = BlockInfo>,
s: S, s: S,
) -> Result<(Overseer<S>, OverseerHandler), ServiceError> { ) -> Result<(Overseer<S>, OverseerHandler), ServiceError> {
let validation = Box::new(CandidateValidationSubsystem); let validation = CandidateValidationSubsystem;
let candidate_backing = Box::new(CandidateBackingSubsystem); let candidate_backing = CandidateBackingSubsystem;
Overseer::new(leaves, validation, candidate_backing, s) Overseer::new(leaves, validation, candidate_backing, s)
.map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) .map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e)))
} }
+1 -1
View File
@@ -146,5 +146,5 @@ pub trait SubsystemContext: Send + 'static {
/// [`Subsystem`]: trait.Subsystem.html /// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<C: SubsystemContext> { pub trait Subsystem<C: SubsystemContext> {
/// Start this `Subsystem` and return `SpawnedSubsystem`. /// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(&mut self, ctx: C) -> SpawnedSubsystem; fn start(self, ctx: C) -> SpawnedSubsystem;
} }