be more careful about fusing in select! (#2052)

This commit is contained in:
Robert Habermeier
2020-12-02 08:52:51 -05:00
committed by GitHub
parent b13052de12
commit 709f990fb2
3 changed files with 10 additions and 9 deletions
@@ -84,8 +84,9 @@ impl CollationGenerationSubsystem {
// at any point waiting for them all, so instead, we create a channel on which they can // at any point waiting for them all, so instead, we create a channel on which they can
// send those messages. We can then just monitor the channel and forward messages on it // send those messages. We can then just monitor the channel and forward messages on it
// to the overseer here, via the context. // to the overseer here, via the context.
let (sender, mut receiver) = mpsc::channel(0); let (sender, receiver) = mpsc::channel(0);
let mut receiver = receiver.fuse();
loop { loop {
select! { select! {
incoming = ctx.recv().fuse() => { incoming = ctx.recv().fuse() => {
@@ -93,7 +94,7 @@ impl CollationGenerationSubsystem {
break; break;
} }
}, },
msg = receiver.next().fuse() => { msg = receiver.next() => {
if let Some(msg) = msg { if let Some(msg) = msg {
ctx.send_message(msg).await; ctx.send_message(msg).await;
} }
+6 -6
View File
@@ -121,8 +121,6 @@ impl ValidatedCandidateCommand {
struct CandidateBackingJob { struct CandidateBackingJob {
/// The hash of the relay parent on top of which this job is doing it's work. /// The hash of the relay parent on top of which this job is doing it's work.
parent: Hash, parent: Hash,
/// Inbound message channel receiving part.
rx_to: mpsc::Receiver<CandidateBackingMessage>,
/// Outbound message channel sending part. /// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJobCommand>, tx_from: mpsc::Sender<FromJobCommand>,
/// The `ParaId` assigned to this validator /// The `ParaId` assigned to this validator
@@ -426,7 +424,10 @@ async fn validate_and_make_available(
impl CandidateBackingJob { impl CandidateBackingJob {
/// Run asynchronously. /// Run asynchronously.
async fn run_loop(mut self) -> Result<(), Error> { async fn run_loop(
mut self,
mut rx_to: mpsc::Receiver<CandidateBackingMessage>,
) -> Result<(), Error> {
loop { loop {
futures::select! { futures::select! {
validated_command = self.background_validation.next() => { validated_command = self.background_validation.next() => {
@@ -436,7 +437,7 @@ impl CandidateBackingJob {
panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed"); panic!("`self` hasn't dropped and `self` holds a reference to this sender; qed");
} }
} }
to_job = self.rx_to.next() => match to_job { to_job = rx_to.next() => match to_job {
None => break, None => break,
Some(msg) => { Some(msg) => {
self.process_msg(msg).await?; self.process_msg(msg).await?;
@@ -917,7 +918,6 @@ impl util::JobTrait for CandidateBackingJob {
let (background_tx, background_rx) = mpsc::channel(16); let (background_tx, background_rx) = mpsc::channel(16);
let job = CandidateBackingJob { let job = CandidateBackingJob {
parent, parent,
rx_to,
tx_from, tx_from,
assignment, assignment,
required_collator, required_collator,
@@ -934,7 +934,7 @@ impl util::JobTrait for CandidateBackingJob {
metrics, metrics,
}; };
job.run_loop().await job.run_loop(rx_to).await
} }
.boxed() .boxed()
} }
@@ -695,7 +695,7 @@ pub(crate) async fn run(
let (relay_parent, validator_id, peer_id) = match res { let (relay_parent, validator_id, peer_id) = match res {
Some(res) => res, Some(res) => res,
// Will never happen, but better to be safe. // Will never happen, but better to be safe.
None => continue, None => return Ok(()),
}; };
let _timer = state.metrics.time_handle_connection_request(); let _timer = state.metrics.time_handle_connection_request();