Remove the streamunordered crate (#3339)

The functionality is now provided by the `futures` crate.
This commit is contained in:
Bastian Köcher
2021-06-21 21:48:33 +02:00
committed by GitHub
parent 74baed8b39
commit 3d2931ace5
3 changed files with 7 additions and 29 deletions
+7 -15
View File
@@ -31,7 +31,7 @@ use polkadot_node_subsystem::{
ActiveLeavesUpdate, OverseerSignal,
};
use polkadot_node_jaeger as jaeger;
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream};
use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::{Stream, SelectAll}};
use futures_timer::Delay;
use parity_scale_codec::Encode;
use pin_project::pin_project;
@@ -48,7 +48,6 @@ use std::{
collections::{HashMap, hash_map::Entry}, convert::TryFrom, marker::Unpin, pin::Pin, task::{Poll, Context},
time::Duration, fmt, sync::Arc,
};
use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error;
pub use metered_channel as metered;
@@ -523,7 +522,7 @@ pub enum JobsError<JobError: std::fmt::Debug + std::error::Error + 'static> {
struct Jobs<Spawner, ToJob> {
spawner: Spawner,
running: HashMap<Hash, JobHandle<ToJob>>,
outgoing_msgs: StreamUnordered<mpsc::Receiver<FromJobCommand>>,
outgoing_msgs: SelectAll<mpsc::Receiver<FromJobCommand>>,
}
impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> {
@@ -532,7 +531,7 @@ impl<Spawner: SpawnNamed, ToJob: Send + 'static> Jobs<Spawner, ToJob> {
Self {
spawner,
running: HashMap::new(),
outgoing_msgs: StreamUnordered::new(),
outgoing_msgs: SelectAll::new(),
}
}
@@ -608,17 +607,10 @@ where
type Item = FromJobCommand;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
match Pin::new(&mut self.outgoing_msgs).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(r) => match r.map(|v| v.0) {
Some(StreamYield::Item(msg)) => return Poll::Ready(Some(msg)),
// If a job is finished, rerun the loop
Some(StreamYield::Finished(_)) => continue,
// Don't end if there are no jobs running
None => return Poll::Pending,
}
}
match futures::ready!(Pin::new(&mut self.outgoing_msgs).poll_next(cx)) {
Some(msg) => Poll::Ready(Some(msg)),
// Don't end if there are no jobs running
None => Poll::Pending,
}
}
}