From 3d2931ace5c4434afd99467755a95899090fc877 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 21 Jun 2021 21:48:33 +0200 Subject: [PATCH] Remove the streamunordered crate (#3339) The functionality is now provided by the `futures` crate. --- polkadot/Cargo.lock | 13 ------------- polkadot/node/subsystem-util/Cargo.toml | 1 - polkadot/node/subsystem-util/src/lib.rs | 22 +++++++--------------- 3 files changed, 7 insertions(+), 29 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index bd41f40272..4c4f0524d1 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6385,7 +6385,6 @@ dependencies = [ "sp-application-crypto", "sp-core", "sp-keystore", - "streamunordered", "substrate-prometheus-endpoint", "thiserror", "tracing", @@ -10052,18 +10051,6 @@ dependencies = [ "generic-array 0.14.4", ] -[[package]] -name = "streamunordered" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9394ee1338fee8370bee649f8a7170b3a56917903a0956467ad192dcf8699ca" -dependencies = [ - "futures-core", - "futures-sink", - "futures-util", - "slab", -] - [[package]] name = "string" version = "0.2.1" diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 205e71173b..63bbeee1ab 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -14,7 +14,6 @@ parity-scale-codec = { version = "2.0.0", default-features = false, features = [ parking_lot = { version = "0.11.1", optional = true } pin-project = "1.0.7" rand = "0.8.3" -streamunordered = "0.5.1" thiserror = "1.0.23" tracing = "0.1.26" lru = "0.6.5" diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index 62f86f5c08..d474a46dd1 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -31,7 +31,7 @@ use polkadot_node_subsystem::{ ActiveLeavesUpdate, OverseerSignal, }; use polkadot_node_jaeger as jaeger; -use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::Stream}; +use futures::{channel::{mpsc, oneshot}, prelude::*, select, stream::{Stream, SelectAll}}; use futures_timer::Delay; use parity_scale_codec::Encode; use pin_project::pin_project; @@ -48,7 +48,6 @@ use std::{ collections::{HashMap, hash_map::Entry}, convert::TryFrom, marker::Unpin, pin::Pin, task::{Poll, Context}, time::Duration, fmt, sync::Arc, }; -use streamunordered::{StreamUnordered, StreamYield}; use thiserror::Error; pub use metered_channel as metered; @@ -523,7 +522,7 @@ pub enum JobsError { struct Jobs { spawner: Spawner, running: HashMap>, - outgoing_msgs: StreamUnordered>, + outgoing_msgs: SelectAll>, } impl Jobs { @@ -532,7 +531,7 @@ impl Jobs { Self { spawner, running: HashMap::new(), - outgoing_msgs: StreamUnordered::new(), + outgoing_msgs: SelectAll::new(), } } @@ -608,17 +607,10 @@ where type Item = FromJobCommand; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - loop { - match Pin::new(&mut self.outgoing_msgs).poll_next(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(r) => match r.map(|v| v.0) { - Some(StreamYield::Item(msg)) => return Poll::Ready(Some(msg)), - // If a job is finished, rerun the loop - Some(StreamYield::Finished(_)) => continue, - // Don't end if there are no jobs running - None => return Poll::Pending, - } - } + match futures::ready!(Pin::new(&mut self.outgoing_msgs).poll_next(cx)) { + Some(msg) => Poll::Ready(Some(msg)), + // Don't end if there are no jobs running + None => Poll::Pending, } } }