use crate::ws_client; use futures::{Sink, SinkExt, Stream, StreamExt}; use crate::feed_message_de::FeedMessage; /// Wrap a `ws_client::Sender` with convenient utility methods for shard connections pub struct ShardSender(ws_client::Sender); impl From for ShardSender { fn from(c: ws_client::Sender) -> Self { ShardSender(c) } } impl Sink for ShardSender { type Error = ws_client::SendError; fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_ready_unpin(cx) } fn start_send(mut self: std::pin::Pin<&mut Self>, item: ws_client::Message) -> Result<(), Self::Error> { self.0.start_send_unpin(item) } fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_flush_unpin(cx) } fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_close_unpin(cx) } } impl ShardSender { pub async fn send_json_binary(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> { let bytes = serde_json::to_vec(&json).expect("valid bytes"); self.send(ws_client::Message::Binary(bytes)).await } pub async fn send_json_text(&mut self, json: serde_json::Value) -> Result<(), ws_client::SendError> { let s = serde_json::to_string(&json).expect("valid string"); self.send(ws_client::Message::Text(s)).await } } /// Wrap a `ws_client::Receiver` with convenient utility methods for shard connections pub struct ShardReceiver(ws_client::Receiver); impl From for ShardReceiver { fn from(c: ws_client::Receiver) -> Self { ShardReceiver(c) } } impl Stream for ShardReceiver { type Item = Result; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_next_unpin(cx) } } /// Wrap a `ws_client::Sender` with convenient utility methods for feed connections pub struct FeedSender(ws_client::Sender); impl From for FeedSender { fn from(c: ws_client::Sender) -> Self { FeedSender(c) } } impl Sink for FeedSender { type Error = ws_client::SendError; fn poll_ready(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_ready_unpin(cx) } fn start_send(mut self: std::pin::Pin<&mut Self>, item: ws_client::Message) -> Result<(), Self::Error> { self.0.start_send_unpin(item) } fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_flush_unpin(cx) } fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_close_unpin(cx) } } impl FeedSender { pub async fn send_command>(&mut self, command: S, param: S) -> Result<(), ws_client::SendError> { self.send(ws_client::Message::Text(format!("{}:{}", command.as_ref(), param.as_ref()))).await } } /// Wrap a `ws_client::Receiver` with convenient utility methods for feed connections pub struct FeedReceiver(ws_client::Receiver); impl From for FeedReceiver { fn from(c: ws_client::Receiver) -> Self { FeedReceiver(c) } } impl Stream for FeedReceiver { type Item = Result; fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { self.0.poll_next_unpin(cx).map_err(|e| e.into()) } } impl FeedReceiver { /// Wait for the next set of feed messages to arrive. Returns an error if the connection /// is closed, or the messages that come back cannot be properly decoded. pub async fn recv_feed_messages(&mut self) -> Result, anyhow::Error> { let msg = self.0 .next() .await .ok_or_else(|| anyhow::anyhow!("Stream closed: no more messages"))??; match msg { ws_client::Message::Binary(data) => { let messages = FeedMessage::from_bytes(&data)?; Ok(messages) }, ws_client::Message::Text(text) => { let messages = FeedMessage::from_bytes(text.as_bytes())?; Ok(messages) } } } }