Update to libp2p v0.20.1 (#6465)

* Update to libp2p-0.20.0

* Update to `libp2p-0.20.1`.

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
Toralf Wittner
2020-06-30 10:02:51 +02:00
committed by GitHub
parent 2e2af4b05a
commit 3de2a88075
14 changed files with 291 additions and 194 deletions
+10 -11
View File
@@ -28,7 +28,6 @@
//! events indicating what happened since the latest polling.
//!
use bytes::BytesMut;
use futures::{prelude::*, ready};
use libp2p::{core::transport::OptionalTransport, Multiaddr, Transport, wasm_ext};
use log::{trace, warn, error};
@@ -61,8 +60,8 @@ impl<T: ?Sized + Stream + Sink<I>, I> StreamAndSink<I> for T {}
type WsTrans = libp2p::core::transport::boxed::Boxed<
Pin<Box<dyn StreamAndSink<
BytesMut,
Item = Result<BytesMut, io::Error>,
Vec<u8>,
Item = Result<Vec<u8>, io::Error>,
Error = io::Error
> + Send>>,
io::Error
@@ -92,12 +91,12 @@ impl TelemetryWorker {
libp2p::websocket::framed::WsConfig::new(inner)
.and_then(|connec, _| {
let connec = connec
.with(|item: BytesMut| {
.with(|item| {
let item = libp2p::websocket::framed::OutgoingData::Binary(item);
future::ready(Ok::<_, io::Error>(item))
})
.try_filter(|item| future::ready(item.is_data()))
.map_ok(|data| BytesMut::from(data.as_ref()));
.map_ok(|data| data.into_bytes());
future::ready(Ok::<_, io::Error>(connec))
})
});
@@ -189,7 +188,7 @@ impl TelemetryWorker {
/// For some context, we put this object around the `wasm_ext::ExtTransport` in order to make sure
/// that each telemetry message maps to one single call to `write` in the WASM FFI.
#[pin_project::pin_project]
struct StreamSink<T>(#[pin] T, Option<BytesMut>);
struct StreamSink<T>(#[pin] T, Option<Vec<u8>>);
impl<T> From<T> for StreamSink<T> {
fn from(inner: T) -> StreamSink<T> {
@@ -198,15 +197,15 @@ impl<T> From<T> for StreamSink<T> {
}
impl<T: AsyncRead> Stream for StreamSink<T> {
type Item = Result<BytesMut, io::Error>;
type Item = Result<Vec<u8>, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let mut buf = [0; 128];
let mut buf = vec![0; 128];
match ready!(AsyncRead::poll_read(this.0, cx, &mut buf)) {
Ok(0) => Poll::Ready(None),
Ok(n) => {
let buf: BytesMut = buf[..n].into();
buf.truncate(n);
Poll::Ready(Some(Ok(buf)))
},
Err(err) => Poll::Ready(Some(Err(err))),
@@ -232,7 +231,7 @@ impl<T: AsyncWrite> StreamSink<T> {
}
}
impl<T: AsyncWrite> Sink<BytesMut> for StreamSink<T> {
impl<T: AsyncWrite> Sink<Vec<u8>> for StreamSink<T> {
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
@@ -240,7 +239,7 @@ impl<T: AsyncWrite> Sink<BytesMut> for StreamSink<T> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> {
fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let this = self.project();
debug_assert!(this.1.is_none());
*this.1 = Some(item);
@@ -18,7 +18,6 @@
//! Contains the `Node` struct, which handles communications with a single telemetry endpoint.
use bytes::BytesMut;
use futures::prelude::*;
use futures_timer::Delay;
use libp2p::Multiaddr;
@@ -57,7 +56,7 @@ struct NodeSocketConnected<TTrans: Transport> {
/// Where to send data.
sink: TTrans::Output,
/// Queue of packets to send.
pending: VecDeque<BytesMut>,
pending: VecDeque<Vec<u8>>,
/// If true, we need to flush the sink.
need_flush: bool,
/// A timeout for the socket to write data.
@@ -103,15 +102,15 @@ impl<TTrans: Transport> Node<TTrans> {
impl<TTrans: Transport, TSinkErr> Node<TTrans>
where TTrans: Clone + Unpin, TTrans::Dial: Unpin,
TTrans::Output: Sink<BytesMut, Error = TSinkErr>
+ Stream<Item=Result<BytesMut, TSinkErr>>
TTrans::Output: Sink<Vec<u8>, Error = TSinkErr>
+ Stream<Item=Result<Vec<u8>, TSinkErr>>
+ Unpin,
TSinkErr: fmt::Debug
{
/// Sends a WebSocket frame to the node. Returns an error if we are not connected to the node.
///
/// After calling this method, you should call `poll` in order for it to be properly processed.
pub fn send_message(&mut self, payload: impl Into<BytesMut>) -> Result<(), ()> {
pub fn send_message(&mut self, payload: impl Into<Vec<u8>>) -> Result<(), ()> {
if let NodeSocket::Connected(NodeSocketConnected { pending, .. }) = &mut self.socket {
if pending.len() <= MAX_PENDING {
trace!(target: "telemetry", "Adding log entry to queue for {:?}", self.addr);
@@ -203,8 +202,8 @@ fn gen_rand_reconnect_delay() -> Delay {
}
impl<TTrans: Transport, TSinkErr> NodeSocketConnected<TTrans>
where TTrans::Output: Sink<BytesMut, Error = TSinkErr>
+ Stream<Item=Result<BytesMut, TSinkErr>>
where TTrans::Output: Sink<Vec<u8>, Error = TSinkErr>
+ Stream<Item=Result<Vec<u8>, TSinkErr>>
+ Unpin
{
/// Processes the queue of messages for the connected socket.