diff --git a/backend/Cargo.lock b/backend/Cargo.lock
index c9235fe..9de9792 100644
--- a/backend/Cargo.lock
+++ b/backend/Cargo.lock
@@ -202,6 +202,7 @@ dependencies = [
"bimap",
"bincode",
"bytes",
+ "flume",
"fnv",
"futures",
"hex",
@@ -1607,6 +1608,7 @@ dependencies = [
"anyhow",
"bincode",
"common",
+ "flume",
"futures",
"hex",
"http",
@@ -1644,6 +1646,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"common",
+ "flume",
"futures",
"http",
"log",
diff --git a/backend/common/Cargo.toml b/backend/common/Cargo.toml
index a6fcd58..a0ccdfa 100644
--- a/backend/common/Cargo.toml
+++ b/backend/common/Cargo.toml
@@ -10,6 +10,7 @@ anyhow = "1.0.42"
base64 = { default-features = false, features = ["alloc"], version = "0.13" }
bimap = "0.6.1"
bytes = "1.0.1"
+flume = "0.10.8"
fnv = "1.0.7"
futures = "0.3.15"
hex = "0.4.3"
diff --git a/backend/common/src/ws_client/connect.rs b/backend/common/src/ws_client/connect.rs
index 0ec6256..787a88b 100644
--- a/backend/common/src/ws_client/connect.rs
+++ b/backend/common/src/ws_client/connect.rs
@@ -14,8 +14,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
use super::on_close::OnClose;
-use futures::channel::mpsc;
-use futures::{SinkExt, StreamExt};
use soketto::handshake::{Client, ServerResponse};
use std::sync::Arc;
use tokio::net::TcpStream;
@@ -73,7 +71,7 @@ impl Connection {
let mut rx_closed2 = tx_closed1.subscribe();
// Receive messages from the socket:
- let (mut tx_to_external, rx_from_ws) = mpsc::unbounded();
+ let (tx_to_external, rx_from_ws) = flume::unbounded();
tokio::spawn(async move {
let mut send_to_external = true;
loop {
@@ -112,7 +110,7 @@ impl Connection {
.map_err(|e| e.into()),
};
- if let Err(e) = tx_to_external.send(msg).await {
+ if let Err(e) = tx_to_external.send_async(msg).await {
// Our external channel may have closed or errored, but the socket hasn't
// been closed, so keep receiving in order to allow the socket to continue to
// function properly (we may be happy just sending messages to it), but stop
@@ -124,12 +122,12 @@ impl Connection {
});
// Send messages to the socket:
- let (tx_to_ws, mut rx_from_external) = mpsc::unbounded();
+ let (tx_to_ws, rx_from_external) = flume::unbounded::();
tokio::spawn(async move {
loop {
// Wait for messages, or bail entirely if asked to close.
let msg = tokio::select! {
- msg = rx_from_external.next() => { msg },
+ msg = rx_from_external.recv_async() => { msg },
_ = rx_closed2.recv() => {
// attempt to gracefully end the connection.
let _ = ws_to_connection.close().await;
@@ -141,8 +139,8 @@ impl Connection {
// needs to keep receiving data for the WS connection to stay open, there's no
// reason to keep this side of the loop open if our channel is closed.
let msg = match msg {
- None => break,
- Some(msg) => msg,
+ Ok(msg) => msg,
+ _ => break,
};
// We don't explicitly shut down the channel if we hit send errors. Why? Because the
@@ -207,7 +205,7 @@ impl Connection {
closer: Arc::clone(&on_close),
},
Receiver {
- inner: rx_from_ws,
+ inner: rx_from_ws.into_stream(),
closer: on_close,
},
)
diff --git a/backend/common/src/ws_client/receiver.rs b/backend/common/src/ws_client/receiver.rs
index 6f45da7..e1e2397 100644
--- a/backend/common/src/ws_client/receiver.rs
+++ b/backend/common/src/ws_client/receiver.rs
@@ -15,13 +15,12 @@
// along with this program. If not, see .
use super::on_close::OnClose;
-use futures::channel::mpsc;
use futures::{Stream, StreamExt};
use std::sync::Arc;
/// Receive messages out of a connection
pub struct Receiver {
- pub(super) inner: mpsc::UnboundedReceiver>,
+ pub(super) inner: flume::r#async::RecvStream<'static, Result>,
pub(super) closer: Arc,
}
diff --git a/backend/common/src/ws_client/sender.rs b/backend/common/src/ws_client/sender.rs
index 45c3d66..9cb8ee7 100644
--- a/backend/common/src/ws_client/sender.rs
+++ b/backend/common/src/ws_client/sender.rs
@@ -15,8 +15,6 @@
// along with this program. If not, see .
use super::on_close::OnClose;
-use futures::channel::mpsc;
-use futures::{Sink, SinkExt};
use std::sync::Arc;
/// A message that can be sent into the channel interface
@@ -41,62 +39,36 @@ pub enum SentMessage {
/// Send messages into the connection
#[derive(Clone)]
pub struct Sender {
- pub(super) inner: mpsc::UnboundedSender,
+ pub(super) inner: flume::Sender,
pub(super) closer: Arc,
}
impl Sender {
/// Ask the underlying Websocket connection to close.
- pub async fn close(&mut self) -> Result<(), SendError> {
+ pub async fn close(&mut self) -> Result<(), SendError> {
self.closer.0.send(()).map_err(|_| SendError::CloseError)?;
Ok(())
}
/// Returns whether this channel is closed.
pub fn is_closed(&self) -> bool {
- self.inner.is_closed()
+ self.inner.is_disconnected()
}
/// Unbounded send will always queue the message and doesn't
/// need to be awaited.
- pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), SendError> {
- self.inner
- .unbounded_send(msg)
- .map_err(|e| e.into_send_error())?;
+ pub fn unbounded_send(&self, msg: SentMessage) -> Result<(), flume::SendError> {
+ self.inner.send(msg)?;
Ok(())
}
+ /// Convert this sender into a Sink
+ pub fn into_sink(self) -> impl futures::Sink + std::marker::Unpin + Clone + 'static {
+ self.inner.into_sink()
+ }
}
#[derive(thiserror::Error, Debug, Clone)]
-pub enum SendError {
+pub enum SendError {
#[error("Failed to send message: {0}")]
- ChannelError(#[from] mpsc::SendError),
+ ChannelError(#[from] flume::SendError),
#[error("Failed to send close message")]
CloseError,
}
-
-impl Sink for Sender {
- type Error = SendError;
- fn poll_ready(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.inner.poll_ready_unpin(cx).map_err(|e| e.into())
- }
- fn start_send(
- mut self: std::pin::Pin<&mut Self>,
- item: SentMessage,
- ) -> Result<(), Self::Error> {
- self.inner.start_send_unpin(item).map_err(|e| e.into())
- }
- fn poll_flush(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.inner.poll_flush_unpin(cx).map_err(|e| e.into())
- }
- fn poll_close(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.inner.poll_close_unpin(cx).map_err(|e| e.into())
- }
-}
diff --git a/backend/telemetry_core/src/aggregator/aggregator_set.rs b/backend/telemetry_core/src/aggregator/aggregator_set.rs
index 113eb15..885c119 100644
--- a/backend/telemetry_core/src/aggregator/aggregator_set.rs
+++ b/backend/telemetry_core/src/aggregator/aggregator_set.rs
@@ -1,7 +1,7 @@
use super::aggregator::{Aggregator, AggregatorOpts};
use super::inner_loop;
use common::EitherSink;
-use futures::{Sink, SinkExt, StreamExt};
+use futures::{Sink, SinkExt};
use inner_loop::FromShardWebsocket;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -53,12 +53,10 @@ impl AggregatorSet {
.collect();
let (tx, rx) = flume::unbounded::();
- let mut rx = rx.into_stream();
- let tx = tx.into_sink();
// Send every incoming message to all aggregators.
tokio::spawn(async move {
- while let Some(msg) = rx.next().await {
+ while let Ok(msg) = rx.recv_async().await {
for conn in &mut conns {
// Unbounded channel under the hood, so this await
// shouldn't ever need to yield.
@@ -70,7 +68,7 @@ impl AggregatorSet {
}
});
- EitherSink::b(tx.sink_map_err(|e| anyhow::anyhow!("{}", e)))
+ EitherSink::b(tx.into_sink().sink_map_err(|e| anyhow::anyhow!("{}", e)))
}
/// Return a sink that a feed can send messages into to be handled by a single aggregator.
diff --git a/backend/telemetry_core/src/aggregator/inner_loop.rs b/backend/telemetry_core/src/aggregator/inner_loop.rs
index 88cb402..9ae511e 100644
--- a/backend/telemetry_core/src/aggregator/inner_loop.rs
+++ b/backend/telemetry_core/src/aggregator/inner_loop.rs
@@ -25,7 +25,6 @@ use common::{
node_types::BlockHash,
time,
};
-use futures::StreamExt;
use std::collections::{HashMap, HashSet};
use std::{
net::{IpAddr, Ipv4Addr},
@@ -184,8 +183,7 @@ impl InnerLoop {
// check the length of the queue below to decide whether or not to
// pass the message on to this.
tokio::spawn(async move {
- let mut metered_rx = metered_rx.into_stream();
- while let Some(msg) = metered_rx.next().await {
+ while let Ok(msg) = metered_rx.recv_async().await {
match msg {
ToAggregator::FromFeedWebsocket(feed_conn_id, msg) => {
self.handle_from_feed(feed_conn_id, msg)
@@ -215,8 +213,7 @@ impl InnerLoop {
});
});
- let mut rx_from_external = rx_from_external.into_stream();
- while let Some(msg) = rx_from_external.next().await {
+ while let Ok(msg) = rx_from_external.recv_async().await {
// ignore node updates if we have too many messages to handle, in an attempt
// to reduce the queue length back to something reasonable, lest it get out of
// control and start consuming a load of memory.
diff --git a/backend/telemetry_core/src/find_location.rs b/backend/telemetry_core/src/find_location.rs
index 326bff1..f020747 100644
--- a/backend/telemetry_core/src/find_location.rs
+++ b/backend/telemetry_core/src/find_location.rs
@@ -17,7 +17,7 @@
use std::net::Ipv4Addr;
use std::sync::Arc;
-use futures::{Sink, SinkExt, StreamExt};
+use futures::{Sink, SinkExt};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use serde::Deserialize;
@@ -36,7 +36,6 @@ where
Id: Clone + Send + 'static,
{
let (tx, rx) = flume::unbounded();
- let mut rx = rx.into_stream();
// cache entries
let mut cache: FxHashMap>> = FxHashMap::default();
@@ -61,7 +60,7 @@ where
let semaphore = Arc::new(Semaphore::new(4));
loop {
- while let Some((id, ip_address)) = rx.next().await {
+ while let Ok((id, ip_address)) = rx.recv_async().await {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let mut response_chan = response_chan.clone();
let locator = locator.clone();
diff --git a/backend/telemetry_shard/Cargo.toml b/backend/telemetry_shard/Cargo.toml
index 2f9d505..370d739 100644
--- a/backend/telemetry_shard/Cargo.toml
+++ b/backend/telemetry_shard/Cargo.toml
@@ -9,6 +9,7 @@ license = "GPL-3.0"
anyhow = "1.0.41"
bincode = "1.3.3"
common = { path = "../common" }
+flume = "0.10.8"
futures = "0.3.15"
hex = "0.4.3"
http = "0.2.4"
diff --git a/backend/telemetry_shard/src/aggregator.rs b/backend/telemetry_shard/src/aggregator.rs
index 1200240..50ded1d 100644
--- a/backend/telemetry_shard/src/aggregator.rs
+++ b/backend/telemetry_shard/src/aggregator.rs
@@ -21,8 +21,7 @@ use common::{
node_types::BlockHash,
AssignId,
};
-use futures::channel::mpsc;
-use futures::{Sink, SinkExt, StreamExt};
+use futures::{Sink, SinkExt};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
@@ -60,7 +59,7 @@ pub enum FromWebsocket {
/// the websocket connection and force the node to reconnect
/// so that it sends its system info again incase the telemetry
/// core has restarted.
- close_connection: mpsc::Sender<()>,
+ close_connection: flume::Sender<()>,
},
/// Tell the aggregator about a new node.
Add {
@@ -94,28 +93,28 @@ struct AggregatorInternal {
/// Send messages to the aggregator from websockets via this. This is
/// stored here so that anybody holding an `Aggregator` handle can
/// make use of it.
- tx_to_aggregator: mpsc::Sender,
+ tx_to_aggregator: flume::Sender,
}
impl Aggregator {
/// Spawn a new Aggregator. This connects to the telemetry backend
pub async fn spawn(telemetry_uri: http::Uri) -> anyhow::Result {
- let (tx_to_aggregator, rx_from_external) = mpsc::channel(10);
+ let (tx_to_aggregator, rx_from_external) = flume::bounded(10);
// Establish a resiliant connection to the core (this retries as needed):
- let (tx_to_telemetry_core, mut rx_from_telemetry_core) =
+ let (tx_to_telemetry_core, rx_from_telemetry_core) =
create_ws_connection_to_core(telemetry_uri).await;
// Forward messages from the telemetry core into the aggregator:
- let mut tx_to_aggregator2 = tx_to_aggregator.clone();
+ let tx_to_aggregator2 = tx_to_aggregator.clone();
tokio::spawn(async move {
- while let Some(msg) = rx_from_telemetry_core.next().await {
+ while let Ok(msg) = rx_from_telemetry_core.recv_async().await {
let msg_to_aggregator = match msg {
Message::Connected => ToAggregator::ConnectedToTelemetryCore,
Message::Disconnected => ToAggregator::DisconnectedFromTelemetryCore,
Message::Data(data) => ToAggregator::FromTelemetryCore(data),
};
- if let Err(_) = tx_to_aggregator2.send(msg_to_aggregator).await {
+ if let Err(_) = tx_to_aggregator2.send_async(msg_to_aggregator).await {
// This will close the ws channels, which themselves log messages.
break;
}
@@ -139,8 +138,8 @@ impl Aggregator {
// in to the aggregator. If nobody is holding the tx side of the channel
// any more, this task will gracefully end.
async fn handle_messages(
- mut rx_from_external: mpsc::Receiver,
- mut tx_to_telemetry_core: mpsc::Sender,
+ rx_from_external: flume::Receiver,
+ tx_to_telemetry_core: flume::Sender,
) {
use internal_messages::{FromShardAggregator, FromTelemetryCore};
@@ -150,7 +149,7 @@ impl Aggregator {
// A list of close channels for the currently connected substrate nodes. Send an empty
// tuple to these to ask the connections to be closed.
- let mut close_connections: HashMap> = HashMap::new();
+ let mut close_connections: HashMap> = HashMap::new();
// Maintain mappings from the connection ID and node message ID to the "local ID" which we
// broadcast to the telemetry core.
@@ -160,15 +159,15 @@ impl Aggregator {
let mut muted: HashSet = HashSet::new();
// Now, loop and receive messages to handle.
- while let Some(msg) = rx_from_external.next().await {
+ while let Ok(msg) = rx_from_external.recv_async().await {
match msg {
ToAggregator::ConnectedToTelemetryCore => {
// Take hold of the connection closers and run them all.
let closers = close_connections;
- for (_, mut closer) in closers {
+ for (_, closer) in closers {
// if this fails, it probably means the connection has died already anyway.
- let _ = closer.send(()).await;
+ let _ = closer.send_async(()).await;
}
// We've told everything to disconnect. Now, reset our state:
@@ -212,7 +211,7 @@ impl Aggregator {
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
- .send(FromShardAggregator::AddNode {
+ .send_async(FromShardAggregator::AddNode {
ip,
node,
genesis_hash,
@@ -245,7 +244,7 @@ impl Aggregator {
// Send the message to the telemetry core with this local ID:
let _ = tx_to_telemetry_core
- .send(FromShardAggregator::UpdateNode { local_id, payload })
+ .send_async(FromShardAggregator::UpdateNode { local_id, payload })
.await;
}
ToAggregator::FromWebsocket(disconnected_conn_id, FromWebsocket::Disconnected) => {
@@ -264,7 +263,7 @@ impl Aggregator {
to_local_id.remove_by_id(local_id);
muted.remove(&local_id);
let _ = tx_to_telemetry_core
- .send(FromShardAggregator::RemoveNode { local_id })
+ .send_async(FromShardAggregator::RemoveNode { local_id })
.await;
}
}
@@ -293,6 +292,7 @@ impl Aggregator {
// but pinning by boxing is the easy solution for now:
Box::pin(
tx_to_aggregator
+ .into_sink()
.with(move |msg| async move { Ok(ToAggregator::FromWebsocket(conn_id, msg)) }),
)
}
diff --git a/backend/telemetry_shard/src/connection.rs b/backend/telemetry_shard/src/connection.rs
index 4ebf39d..4b24bee 100644
--- a/backend/telemetry_shard/src/connection.rs
+++ b/backend/telemetry_shard/src/connection.rs
@@ -16,8 +16,7 @@
use bincode::Options;
use common::ws_client;
-use futures::channel::mpsc;
-use futures::{SinkExt, StreamExt};
+use futures::StreamExt;
#[derive(Clone, Debug)]
pub enum Message {
@@ -36,13 +35,13 @@ pub enum Message {
/// between aggregator and core.
pub async fn create_ws_connection_to_core(
telemetry_uri: http::Uri,
-) -> (mpsc::Sender, mpsc::Receiver>)
+) -> (flume::Sender, flume::Receiver>)
where
In: serde::Serialize + Send + 'static,
Out: serde::de::DeserializeOwned + Send + 'static,
{
- let (tx_in, mut rx_in) = mpsc::channel(10);
- let (mut tx_out, rx_out) = mpsc::channel(10);
+ let (tx_in, rx_in) = flume::bounded::(10);
+ let (tx_out, rx_out) = flume::bounded(10);
let mut is_connected = false;
@@ -51,7 +50,7 @@ where
// Throw away any pending messages from the incoming channel so that it
// doesn't get filled up and begin blocking while we're looping and waiting
// for a reconnection.
- while let Ok(Some(_)) = rx_in.try_next() {}
+ while let Ok(_) = rx_in.try_recv() {}
// Try to connect. If connection established, we serialize and forward messages
// to/from the core. If the external channels break, we end for good. If the internal
@@ -60,9 +59,9 @@ where
Ok(connection) => {
let (tx_to_core, mut rx_from_core) = connection.into_channels();
is_connected = true;
- let mut tx_out = tx_out.clone();
+ let tx_out = tx_out.clone();
- if let Err(e) = tx_out.send(Message::Connected).await {
+ if let Err(e) = tx_out.send_async(Message::Connected).await {
// If receiving end is closed, bail now.
log::warn!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
@@ -73,35 +72,31 @@ where
tokio::select! {
msg = rx_from_core.next() => {
let msg = match msg {
- Some(msg) => msg,
+ Some(Ok(msg)) => msg,
// No more messages from core? core WS is disconnected.
- None => {
+ _ => {
log::warn!("No more messages from core: shutting down connection (will reconnect)");
break
}
};
let bytes = match msg {
- Ok(ws_client::RecvMessage::Binary(bytes)) => bytes,
- Ok(ws_client::RecvMessage::Text(s)) => s.into_bytes(),
- Err(e) => {
- log::warn!("Unable to receive message from core: shutting down connection (will reconnect): {}", e);
- break;
- }
+ ws_client::RecvMessage::Binary(bytes) => bytes,
+ ws_client::RecvMessage::Text(s) => s.into_bytes()
};
let msg = bincode::options()
.deserialize(&bytes)
.expect("internal messages must be deserializable");
- if let Err(e) = tx_out.send(Message::Data(msg)).await {
+ if let Err(e) = tx_out.send_async(Message::Data(msg)).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
},
- msg = rx_in.next() => {
+ msg = rx_in.recv_async() => {
let msg = match msg {
- Some(msg) => msg,
- None => {
+ Ok(msg) => msg,
+ Err(flume::RecvError::Disconnected) => {
log::error!("Aggregator is no longer sending messages to core; disconnecting (permanently)");
return
}
@@ -131,7 +126,7 @@ where
if is_connected {
is_connected = false;
- if let Err(e) = tx_out.send(Message::Disconnected).await {
+ if let Err(e) = tx_out.send_async(Message::Disconnected).await {
log::error!("Aggregator is no longer receiving messages from core; disconnecting (permanently): {}", e);
return;
}
diff --git a/backend/telemetry_shard/src/main.rs b/backend/telemetry_shard/src/main.rs
index 4e62d76..86f632c 100644
--- a/backend/telemetry_shard/src/main.rs
+++ b/backend/telemetry_shard/src/main.rs
@@ -29,7 +29,7 @@ use common::byte_size::ByteSize;
use common::http_utils;
use common::node_message;
use common::rolling_total::RollingTotalBuilder;
-use futures::{channel::mpsc, SinkExt, StreamExt};
+use futures::SinkExt;
use http::Uri;
use hyper::{Method, Response};
use simple_logger::SimpleLogger;
@@ -203,7 +203,7 @@ where
// This could be a oneshot channel, but it's useful to be able to clone
// messages, and we can't clone oneshot channel senders.
- let (close_connection_tx, mut close_connection_rx) = mpsc::channel(0);
+ let (close_connection_tx, close_connection_rx) = flume::bounded(1);
// Tell the aggregator about this new connection, and give it a way to close this connection:
let init_msg = FromWebsocket::Initialize {
@@ -223,7 +223,7 @@ where
tokio::select! {
// The close channel has fired, so end the loop. `ws_recv.receive_data` is
// *not* cancel safe, but since we're closing the connection we don't care.
- _ = close_connection_rx.next() => {
+ _ = close_connection_rx.recv_async() => {
log::info!("connection to {:?} being closed by aggregator", real_addr);
break
},
diff --git a/backend/test_utils/Cargo.toml b/backend/test_utils/Cargo.toml
index e3800c7..7dccdae 100644
--- a/backend/test_utils/Cargo.toml
+++ b/backend/test_utils/Cargo.toml
@@ -18,3 +18,4 @@ tokio = { version = "1.7.1", features = ["full"] }
tokio-util = { version = "0.6.7", features = ["full"] }
common = { path = "../common" }
time = { version = "0.3.0", features = ["formatting"] }
+flume = "0.10.8"
diff --git a/backend/test_utils/src/server/channels.rs b/backend/test_utils/src/server/channels.rs
index f130f7e..4ca9274 100644
--- a/backend/test_utils/src/server/channels.rs
+++ b/backend/test_utils/src/server/channels.rs
@@ -21,7 +21,7 @@ use std::{
use crate::feed_message_de::FeedMessage;
use common::ws_client;
-use futures::{Sink, SinkExt, Stream, StreamExt};
+use futures::{Stream, StreamExt};
/// Wrap a `ws_client::Sender` with convenient utility methods for shard connections
pub struct ShardSender(ws_client::Sender);
@@ -32,45 +32,17 @@ impl From for ShardSender {
}
}
-impl Sink for ShardSender {
- type Error = ws_client::SendError;
- fn poll_ready(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.0.poll_ready_unpin(cx)
- }
- fn start_send(
- mut self: std::pin::Pin<&mut Self>,
- item: ws_client::SentMessage,
- ) -> Result<(), Self::Error> {
- self.0.start_send_unpin(item)
- }
- fn poll_flush(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.0.poll_flush_unpin(cx)
- }
- fn poll_close(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.0.poll_close_unpin(cx)
- }
-}
-
impl ShardSender {
/// Send JSON as a binary websocket message
pub fn send_json_binary(
&mut self,
json: serde_json::Value,
- ) -> Result<(), ws_client::SendError> {
+ ) -> Result<(), flume::SendError> {
let bytes = serde_json::to_vec(&json).expect("valid bytes");
self.unbounded_send(ws_client::SentMessage::Binary(bytes))
}
/// Send JSON as a textual websocket message
- pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> {
+ pub fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), flume::SendError> {
let s = serde_json::to_string(&json).expect("valid string");
self.unbounded_send(ws_client::SentMessage::Text(s))
}
@@ -128,34 +100,6 @@ impl From for FeedSender {
}
}
-impl Sink for FeedSender {
- type Error = ws_client::SendError;
- fn poll_ready(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.0.poll_ready_unpin(cx)
- }
- fn start_send(
- mut self: std::pin::Pin<&mut Self>,
- item: ws_client::SentMessage,
- ) -> Result<(), Self::Error> {
- self.0.start_send_unpin(item)
- }
- fn poll_flush(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.0.poll_flush_unpin(cx)
- }
- fn poll_close(
- mut self: std::pin::Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll> {
- self.0.poll_close_unpin(cx)
- }
-}
-
impl Deref for FeedSender {
type Target = ws_client::Sender;
fn deref(&self) -> &Self::Target {
@@ -176,7 +120,7 @@ impl FeedSender {
&self,
command: S,
param: S,
- ) -> Result<(), ws_client::SendError> {
+ ) -> Result<(), flume::SendError> {
self.unbounded_send(ws_client::SentMessage::Text(format!(
"{}:{}",
command.as_ref(),