mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-25 02:27:58 +00:00
Update smoldot to 0.12 (#1212)
* Update lightclient Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * testing: Fix typo Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * testing: Update cargo.toml Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * lightclient: Add tracing logs to improve debugging Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * lightclient: Add socket buffers module for `PlatformRef` Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * lightclient: Update `SubxtPlatform` Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * cargo: Add lightclient dependencies Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * Update cargo.lock of wasm tests Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * lightclient: Add constant for with-buffer module Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * lightclient: Replace rand crate with getrandom Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * example: Update cargo lock file Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> * examples: Update deps Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> --------- Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io> Co-authored-by: Tadeo Hepperle <62739623+tadeohepperle@users.noreply.github.com>
This commit is contained in:
@@ -5,20 +5,10 @@
|
||||
//! Wasm implementation for the light client's platform using
|
||||
//! custom websockets.
|
||||
|
||||
use core::time::Duration;
|
||||
use futures_util::{future, FutureExt};
|
||||
use smoldot::libp2p::multiaddr::ProtocolRef;
|
||||
use smoldot_light::platform::{ConnectError, PlatformConnection};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
net::{IpAddr, SocketAddr},
|
||||
};
|
||||
|
||||
use super::wasm_socket::WasmSocket;
|
||||
|
||||
pub fn spawn(task: future::BoxFuture<'static, ()>) {
|
||||
wasm_bindgen_futures::spawn_local(task);
|
||||
}
|
||||
use core::time::Duration;
|
||||
use futures_util::{future, FutureExt};
|
||||
|
||||
pub fn now_from_unix_epoch() -> Duration {
|
||||
instant::SystemTime::now()
|
||||
@@ -40,87 +30,381 @@ pub fn sleep(duration: Duration) -> Delay {
|
||||
futures_timer::Delay::new(duration).boxed()
|
||||
}
|
||||
|
||||
pub struct Stream {
|
||||
pub socket: WasmSocket,
|
||||
/// Read and write buffers of the connection, or `None` if the socket has been reset.
|
||||
pub buffers: Option<(StreamReadBuffer, StreamWriteBuffer)>,
|
||||
}
|
||||
/// Implementation detail of a stream from the `SubxtPlatform`.
|
||||
#[pin_project::pin_project]
|
||||
pub struct Stream(#[pin] pub with_buffers::WithBuffers<WasmSocket>);
|
||||
|
||||
pub enum StreamReadBuffer {
|
||||
Open {
|
||||
buffer: Vec<u8>,
|
||||
cursor: std::ops::Range<usize>,
|
||||
},
|
||||
Closed,
|
||||
}
|
||||
pub mod with_buffers {
|
||||
use smoldot::libp2p::read_write;
|
||||
|
||||
pub enum StreamWriteBuffer {
|
||||
Open {
|
||||
buffer: VecDeque<u8>,
|
||||
must_flush: bool,
|
||||
must_close: bool,
|
||||
},
|
||||
Closed,
|
||||
}
|
||||
|
||||
pub async fn connect<'a>(
|
||||
proto1: ProtocolRef<'a>,
|
||||
proto2: ProtocolRef<'a>,
|
||||
proto3: Option<ProtocolRef<'a>>,
|
||||
) -> Result<PlatformConnection<Stream, std::convert::Infallible>, ConnectError> {
|
||||
// Ensure ahead of time that the multiaddress is supported.
|
||||
let addr = match (&proto1, &proto2, &proto3) {
|
||||
(ProtocolRef::Ip4(ip), ProtocolRef::Tcp(port), Some(ProtocolRef::Ws)) => {
|
||||
let addr = SocketAddr::new(IpAddr::V4((*ip).into()), *port);
|
||||
format!("ws://{}", addr.to_string())
|
||||
}
|
||||
(ProtocolRef::Ip6(ip), ProtocolRef::Tcp(port), Some(ProtocolRef::Ws)) => {
|
||||
let addr = SocketAddr::new(IpAddr::V6((*ip).into()), *port);
|
||||
format!("ws://{}", addr.to_string())
|
||||
}
|
||||
(
|
||||
ProtocolRef::Dns(addr) | ProtocolRef::Dns4(addr) | ProtocolRef::Dns6(addr),
|
||||
ProtocolRef::Tcp(port),
|
||||
Some(ProtocolRef::Ws),
|
||||
) => {
|
||||
format!("ws://{}:{}", addr.to_string(), port)
|
||||
}
|
||||
(
|
||||
ProtocolRef::Dns(addr) | ProtocolRef::Dns4(addr) | ProtocolRef::Dns6(addr),
|
||||
ProtocolRef::Tcp(port),
|
||||
Some(ProtocolRef::Wss),
|
||||
) => {
|
||||
format!("wss://{}:{}", addr.to_string(), port)
|
||||
}
|
||||
_ => {
|
||||
return Err(ConnectError {
|
||||
is_bad_addr: true,
|
||||
message: "Unknown protocols combination".to_string(),
|
||||
})
|
||||
}
|
||||
use core::{
|
||||
fmt, future, mem, ops,
|
||||
pin::{self, Pin},
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
tracing::debug!("Connecting to addr={addr}");
|
||||
use crate::platform::wasm_helpers::Instant;
|
||||
use futures_util::{AsyncRead, AsyncWrite};
|
||||
use std::io;
|
||||
/// Holds an implementation of `AsyncRead` and `AsyncWrite`, alongside with a read buffer and a
|
||||
/// write buffer.
|
||||
#[pin_project::pin_project]
|
||||
pub struct WithBuffers<T> {
|
||||
/// Actual socket to read from/write to.
|
||||
#[pin]
|
||||
socket: T,
|
||||
/// Error that has happened on the socket, if any.
|
||||
error: Option<io::Error>,
|
||||
/// Storage for data read from the socket. The first [`WithBuffers::read_buffer_valid`] bytes
|
||||
/// contain actual socket data, while the rest contains garbage data.
|
||||
/// The capacity of this buffer is at least equal to the amount of bytes requested by the
|
||||
/// inner data consumer.
|
||||
read_buffer: Vec<u8>,
|
||||
/// Number of bytes of data in [`WithBuffers::read_buffer`] that contain actual data.
|
||||
read_buffer_valid: usize,
|
||||
read_buffer_reasonable_capacity: usize,
|
||||
/// True if reading from the socket has returned `Ok(0)` earlier, in other words "end of
|
||||
/// file".
|
||||
read_closed: bool,
|
||||
/// Storage for data to write to the socket.
|
||||
write_buffers: Vec<Vec<u8>>,
|
||||
/// True if the consumer has closed the writing side earlier.
|
||||
write_closed: bool,
|
||||
/// True if the consumer has closed the writing side earlier, and the socket still has to
|
||||
/// be closed.
|
||||
close_pending: bool,
|
||||
/// True if data has been written on the socket and the socket needs to be flushed.
|
||||
flush_pending: bool,
|
||||
|
||||
let socket = WasmSocket::new(addr.as_str()).map_err(|err| ConnectError {
|
||||
is_bad_addr: false,
|
||||
message: format!("Failed to reach peer: {err}"),
|
||||
})?;
|
||||
/// Value of [`read_write::ReadWrite::now`] that was fed by the latest call to
|
||||
/// [`WithBuffers::read_write_access`].
|
||||
read_write_now: Option<Instant>,
|
||||
/// Value of [`read_write::ReadWrite::wake_up_after`] produced by the latest call
|
||||
/// to [`WithBuffers::read_write_access`].
|
||||
read_write_wake_up_after: Option<Instant>,
|
||||
}
|
||||
|
||||
Ok(PlatformConnection::SingleStreamMultistreamSelectNoiseYamux(
|
||||
Stream {
|
||||
socket,
|
||||
buffers: Some((
|
||||
StreamReadBuffer::Open {
|
||||
buffer: vec![0; 16384],
|
||||
cursor: 0..0,
|
||||
const BUFFER_CAPACITY: usize = 65536;
|
||||
const WRITE_BYTES_QUEUEABLE: usize = 128 * 1024;
|
||||
|
||||
impl<T> WithBuffers<T> {
|
||||
/// Initializes a new [`WithBuffers`] with the given socket.
|
||||
///
|
||||
/// The socket must still be open in both directions.
|
||||
pub fn new(socket: T) -> Self {
|
||||
WithBuffers {
|
||||
socket,
|
||||
error: None,
|
||||
read_buffer: Vec::with_capacity(BUFFER_CAPACITY),
|
||||
read_buffer_valid: 0,
|
||||
read_buffer_reasonable_capacity: BUFFER_CAPACITY,
|
||||
read_closed: false,
|
||||
write_buffers: Vec::with_capacity(64),
|
||||
write_closed: false,
|
||||
close_pending: false,
|
||||
flush_pending: false,
|
||||
read_write_now: None,
|
||||
read_write_wake_up_after: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an object that implements `Deref<Target = ReadWrite>`. This object can be used
|
||||
/// to push or pull data to/from the socket.
|
||||
///
|
||||
/// > **Note**: The parameter requires `Self` to be pinned for consistency with
|
||||
/// > [`WithBuffers::wait_read_write_again`].
|
||||
pub fn read_write_access(
|
||||
self: Pin<&mut Self>,
|
||||
now: Instant,
|
||||
) -> Result<ReadWriteAccess, &io::Error> {
|
||||
let this = self.project();
|
||||
|
||||
debug_assert!(this
|
||||
.read_write_now
|
||||
.as_ref()
|
||||
.map_or(true, |old_now| *old_now <= now));
|
||||
*this.read_write_wake_up_after = None;
|
||||
*this.read_write_now = Some(now);
|
||||
|
||||
if let Some(error) = this.error.as_ref() {
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
this.read_buffer.truncate(*this.read_buffer_valid);
|
||||
|
||||
let write_bytes_queued = this.write_buffers.iter().map(Vec::len).sum();
|
||||
|
||||
Ok(ReadWriteAccess {
|
||||
read_buffer_len_before: this.read_buffer.len(),
|
||||
write_buffers_len_before: this.write_buffers.len(),
|
||||
read_write: read_write::ReadWrite {
|
||||
now,
|
||||
incoming_buffer: mem::take(this.read_buffer),
|
||||
expected_incoming_bytes: if !*this.read_closed { Some(0) } else { None },
|
||||
read_bytes: 0,
|
||||
write_bytes_queued,
|
||||
write_buffers: mem::take(this.write_buffers),
|
||||
write_bytes_queueable: if !*this.write_closed {
|
||||
// Limit outgoing buffer size to 128kiB.
|
||||
Some(WRITE_BYTES_QUEUEABLE.saturating_sub(write_bytes_queued))
|
||||
} else {
|
||||
None
|
||||
},
|
||||
wake_up_after: this.read_write_wake_up_after.take(),
|
||||
},
|
||||
StreamWriteBuffer::Open {
|
||||
buffer: VecDeque::with_capacity(16384),
|
||||
must_close: false,
|
||||
must_flush: false,
|
||||
},
|
||||
)),
|
||||
},
|
||||
))
|
||||
read_buffer: this.read_buffer,
|
||||
read_buffer_valid: this.read_buffer_valid,
|
||||
read_buffer_reasonable_capacity: *this.read_buffer_reasonable_capacity,
|
||||
write_buffers: this.write_buffers,
|
||||
write_closed: this.write_closed,
|
||||
close_pending: this.close_pending,
|
||||
read_write_wake_up_after: this.read_write_wake_up_after,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> WithBuffers<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// Waits until [`WithBuffers::read_write_access`] should be called again.
|
||||
///
|
||||
/// Returns if an error happens on the socket. If an error happened in the past on the socket,
|
||||
/// the future never yields.
|
||||
pub async fn wait_read_write_again<F>(
|
||||
self: Pin<&mut Self>,
|
||||
timer_builder: impl FnOnce(Instant) -> F,
|
||||
) where
|
||||
F: future::Future<Output = ()>,
|
||||
{
|
||||
let mut this = self.project();
|
||||
|
||||
// Return immediately if `wake_up_after <= now`.
|
||||
match (&*this.read_write_wake_up_after, &*this.read_write_now) {
|
||||
(Some(when_wake_up), Some(now)) if *when_wake_up <= *now => {
|
||||
return;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let mut timer = pin::pin!({
|
||||
let fut = this
|
||||
.read_write_wake_up_after
|
||||
.as_ref()
|
||||
.map(|when| timer_builder(*when));
|
||||
async {
|
||||
if let Some(fut) = fut {
|
||||
fut.await;
|
||||
} else {
|
||||
future::pending::<()>().await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Grow the read buffer in order to make space for potentially more data.
|
||||
this.read_buffer.resize(this.read_buffer.capacity(), 0);
|
||||
|
||||
future::poll_fn(move |cx| {
|
||||
if this.error.is_some() {
|
||||
// Never return.
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
// If still `true` at the end of the function, `Poll::Pending` is returned.
|
||||
let mut pending = true;
|
||||
|
||||
match future::Future::poll(Pin::new(&mut timer), cx) {
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(()) => {
|
||||
pending = false;
|
||||
}
|
||||
}
|
||||
|
||||
if !*this.read_closed {
|
||||
let read_result = AsyncRead::poll_read(
|
||||
this.socket.as_mut(),
|
||||
cx,
|
||||
&mut this.read_buffer[*this.read_buffer_valid..],
|
||||
);
|
||||
|
||||
match read_result {
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(Ok(0)) => {
|
||||
*this.read_closed = true;
|
||||
pending = false;
|
||||
}
|
||||
Poll::Ready(Ok(n)) => {
|
||||
*this.read_buffer_valid += n;
|
||||
// TODO: consider waking up only if the expected bytes of the consumer are exceeded
|
||||
pending = false;
|
||||
}
|
||||
Poll::Ready(Err(err)) => {
|
||||
*this.error = Some(err);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
loop {
|
||||
if this.write_buffers.iter().any(|b| !b.is_empty()) {
|
||||
let write_result = {
|
||||
let buffers = this
|
||||
.write_buffers
|
||||
.iter()
|
||||
.map(|buf| io::IoSlice::new(buf))
|
||||
.collect::<Vec<_>>();
|
||||
AsyncWrite::poll_write_vectored(this.socket.as_mut(), cx, &buffers)
|
||||
};
|
||||
|
||||
match write_result {
|
||||
Poll::Ready(Ok(0)) => {
|
||||
// It is not legal for `poll_write` to return 0 bytes written.
|
||||
unreachable!();
|
||||
}
|
||||
Poll::Ready(Ok(mut n)) => {
|
||||
*this.flush_pending = true;
|
||||
while n > 0 {
|
||||
let first_buf = this.write_buffers.first_mut().unwrap();
|
||||
if first_buf.len() <= n {
|
||||
n -= first_buf.len();
|
||||
this.write_buffers.remove(0);
|
||||
} else {
|
||||
// TODO: consider keeping the buffer as is but starting the next write at a later offset
|
||||
first_buf.copy_within(n.., 0);
|
||||
first_buf.truncate(first_buf.len() - n);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Wake up if the write buffers switch from non-empty to empty.
|
||||
if this.write_buffers.is_empty() {
|
||||
pending = false;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(err)) => {
|
||||
*this.error = Some(err);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Poll::Pending => break,
|
||||
};
|
||||
} else if *this.flush_pending {
|
||||
match AsyncWrite::poll_flush(this.socket.as_mut(), cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
*this.flush_pending = false;
|
||||
}
|
||||
Poll::Ready(Err(err)) => {
|
||||
*this.error = Some(err);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Poll::Pending => break,
|
||||
}
|
||||
} else if *this.close_pending {
|
||||
match AsyncWrite::poll_close(this.socket.as_mut(), cx) {
|
||||
Poll::Ready(Ok(())) => {
|
||||
*this.close_pending = false;
|
||||
pending = false;
|
||||
break;
|
||||
}
|
||||
Poll::Ready(Err(err)) => {
|
||||
*this.error = Some(err);
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Poll::Pending => break,
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !pending {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
})
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: fmt::Debug> fmt::Debug for WithBuffers<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.debug_tuple("WithBuffers").field(&self.socket).finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// See [`WithBuffers::read_write_access`].
|
||||
pub struct ReadWriteAccess<'a> {
|
||||
read_write: read_write::ReadWrite<Instant>,
|
||||
|
||||
read_buffer_len_before: usize,
|
||||
write_buffers_len_before: usize,
|
||||
|
||||
// Fields below as references from the content of the `WithBuffers`.
|
||||
read_buffer: &'a mut Vec<u8>,
|
||||
read_buffer_valid: &'a mut usize,
|
||||
read_buffer_reasonable_capacity: usize,
|
||||
write_buffers: &'a mut Vec<Vec<u8>>,
|
||||
write_closed: &'a mut bool,
|
||||
close_pending: &'a mut bool,
|
||||
read_write_wake_up_after: &'a mut Option<Instant>,
|
||||
}
|
||||
|
||||
impl<'a> ops::Deref for ReadWriteAccess<'a> {
|
||||
type Target = read_write::ReadWrite<Instant>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.read_write
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ops::DerefMut for ReadWriteAccess<'a> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.read_write
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for ReadWriteAccess<'a> {
|
||||
fn drop(&mut self) {
|
||||
*self.read_buffer = mem::take(&mut self.read_write.incoming_buffer);
|
||||
*self.read_buffer_valid = self.read_buffer.len();
|
||||
|
||||
// Adjust `read_buffer` to the number of bytes requested by the consumer.
|
||||
if let Some(expected_incoming_bytes) = self.read_write.expected_incoming_bytes {
|
||||
if expected_incoming_bytes < self.read_buffer_reasonable_capacity
|
||||
&& self.read_buffer.is_empty()
|
||||
{
|
||||
// We use `shrink_to(0)` then `reserve(cap)` rather than just `shrink_to(cap)`
|
||||
// so that the `Vec` doesn't try to preserve the data in the read buffer.
|
||||
self.read_buffer.shrink_to(0);
|
||||
self.read_buffer
|
||||
.reserve(self.read_buffer_reasonable_capacity);
|
||||
} else if expected_incoming_bytes > self.read_buffer.len() {
|
||||
self.read_buffer
|
||||
.reserve(expected_incoming_bytes - self.read_buffer.len());
|
||||
}
|
||||
debug_assert!(self.read_buffer.capacity() >= expected_incoming_bytes);
|
||||
}
|
||||
|
||||
*self.write_buffers = mem::take(&mut self.read_write.write_buffers);
|
||||
|
||||
if self.read_write.write_bytes_queueable.is_none() && !*self.write_closed {
|
||||
*self.write_closed = true;
|
||||
*self.close_pending = true;
|
||||
}
|
||||
|
||||
*self.read_write_wake_up_after = self.read_write.wake_up_after.take();
|
||||
|
||||
// If the consumer has advanced its reading or writing sides, we make the next call to
|
||||
// `read_write_access` return immediately by setting `wake_up_after`.
|
||||
if (self.read_buffer_len_before != self.read_buffer.len()
|
||||
&& self
|
||||
.read_write
|
||||
.expected_incoming_bytes
|
||||
.map_or(false, |b| b <= self.read_buffer.len()))
|
||||
|| (self.write_buffers_len_before != self.write_buffers.len()
|
||||
&& !*self.write_closed)
|
||||
{
|
||||
*self.read_write_wake_up_after = Some(self.read_write.now);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,21 +2,22 @@
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use super::wasm_socket::WasmSocket;
|
||||
|
||||
use core::time::Duration;
|
||||
use futures::{prelude::*, task::Poll};
|
||||
|
||||
use smoldot::libp2p::multiaddr::Multiaddr;
|
||||
use futures::prelude::*;
|
||||
use smoldot_light::platform::{
|
||||
ConnectError, PlatformConnection, PlatformRef, PlatformSubstreamDirection, ReadBuffer,
|
||||
Address, ConnectError, ConnectionType, IpAddr, MultiStreamAddress, MultiStreamWebRtcConnection,
|
||||
PlatformRef, SubstreamDirection,
|
||||
};
|
||||
use std::{io::IoSlice, pin::Pin};
|
||||
use std::{io, net::SocketAddr, pin::Pin};
|
||||
|
||||
use super::wasm_helpers::{StreamReadBuffer, StreamWriteBuffer};
|
||||
const LOG_TARGET: &str = "subxt-platform-wasm";
|
||||
|
||||
/// Subxt plaform implementation for wasm.
|
||||
/// Subxt platform implementation for wasm.
|
||||
///
|
||||
/// This implementation is a conversion of the implementation from the smoldot:
|
||||
/// https://github.com/smol-dot/smoldot/blob/f49ce4ea6a325c444ab6ad37d3ab5558edf0d541/light-base/src/platform/default.rs#L52.
|
||||
/// https://github.com/smol-dot/smoldot/blob/6401d4df90569e23073d646b14a8fbf9f7e6bdd3/light-base/src/platform/default.rs#L83.
|
||||
///
|
||||
/// This platform will evolve over time and we'll need to keep this code in sync.
|
||||
#[derive(Clone)]
|
||||
@@ -30,17 +31,18 @@ impl SubxtPlatform {
|
||||
|
||||
impl PlatformRef for SubxtPlatform {
|
||||
type Delay = super::wasm_helpers::Delay;
|
||||
type Yield = future::Ready<()>;
|
||||
type Instant = super::wasm_helpers::Instant;
|
||||
type Connection = std::convert::Infallible;
|
||||
type MultiStream = std::convert::Infallible;
|
||||
type Stream = super::wasm_helpers::Stream;
|
||||
type ConnectFuture = future::BoxFuture<
|
||||
type StreamConnectFuture = future::BoxFuture<'static, Result<Self::Stream, ConnectError>>;
|
||||
type MultiStreamConnectFuture = future::BoxFuture<
|
||||
'static,
|
||||
Result<PlatformConnection<Self::Stream, Self::Connection>, ConnectError>,
|
||||
Result<MultiStreamWebRtcConnection<Self::MultiStream>, ConnectError>,
|
||||
>;
|
||||
type ReadWriteAccess<'a> = super::wasm_helpers::with_buffers::ReadWriteAccess<'a>;
|
||||
type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
|
||||
type NextSubstreamFuture<'a> =
|
||||
future::Pending<Option<(Self::Stream, PlatformSubstreamDirection)>>;
|
||||
type StreamErrorRef<'a> = &'a std::io::Error;
|
||||
type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
|
||||
|
||||
fn now_from_unix_epoch(&self) -> Duration {
|
||||
super::wasm_helpers::now_from_unix_epoch()
|
||||
@@ -50,6 +52,13 @@ impl PlatformRef for SubxtPlatform {
|
||||
super::wasm_helpers::now()
|
||||
}
|
||||
|
||||
fn fill_random_bytes(&self, buffer: &mut [u8]) {
|
||||
// This could fail if the system does not have access to a good source of entropy.
|
||||
// Note: `rand::RngCore::fill_bytes` also panics on errors and `rand::OsCore` calls
|
||||
// identically into `getrandom::getrandom`.
|
||||
getrandom::getrandom(buffer).expect("Cannot fill random bytes");
|
||||
}
|
||||
|
||||
fn sleep(&self, duration: Duration) -> Self::Delay {
|
||||
super::wasm_helpers::sleep(duration)
|
||||
}
|
||||
@@ -58,251 +67,12 @@ impl PlatformRef for SubxtPlatform {
|
||||
self.sleep(when.saturating_duration_since(self.now()))
|
||||
}
|
||||
|
||||
fn yield_after_cpu_intensive(&self) -> Self::Yield {
|
||||
// No-op.
|
||||
future::ready(())
|
||||
}
|
||||
|
||||
fn connect(&self, multiaddr: &str) -> Self::ConnectFuture {
|
||||
// We simply copy the address to own it. We could be more zero-cost here, but doing so
|
||||
// would considerably complicate the implementation.
|
||||
let multiaddr = multiaddr.to_owned();
|
||||
|
||||
tracing::debug!("Connecting to multiaddress={:?}", multiaddr);
|
||||
|
||||
Box::pin(async move {
|
||||
let addr = multiaddr.parse::<Multiaddr>().map_err(|_| ConnectError {
|
||||
is_bad_addr: true,
|
||||
message: "Failed to parse address".to_string(),
|
||||
})?;
|
||||
|
||||
let mut iter = addr.iter().fuse();
|
||||
let proto1 = iter.next().ok_or(ConnectError {
|
||||
is_bad_addr: true,
|
||||
message: "Unknown protocols combination".to_string(),
|
||||
})?;
|
||||
let proto2 = iter.next().ok_or(ConnectError {
|
||||
is_bad_addr: true,
|
||||
message: "Unknown protocols combination".to_string(),
|
||||
})?;
|
||||
let proto3 = iter.next();
|
||||
|
||||
if iter.next().is_some() {
|
||||
return Err(ConnectError {
|
||||
is_bad_addr: true,
|
||||
message: "Unknown protocols combination".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
super::wasm_helpers::connect(proto1, proto2, proto3).await
|
||||
})
|
||||
}
|
||||
|
||||
fn open_out_substream(&self, c: &mut Self::Connection) {
|
||||
// This function can only be called with so-called "multi-stream" connections. We never
|
||||
// open such connection.
|
||||
match *c {}
|
||||
}
|
||||
|
||||
fn next_substream<'a>(&self, c: &'a mut Self::Connection) -> Self::NextSubstreamFuture<'a> {
|
||||
// This function can only be called with so-called "multi-stream" connections. We never
|
||||
// open such connection.
|
||||
match *c {}
|
||||
}
|
||||
|
||||
fn update_stream<'a>(&self, stream: &'a mut Self::Stream) -> Self::StreamUpdateFuture<'a> {
|
||||
Box::pin(future::poll_fn(|cx| {
|
||||
// The `connect` is expected to be called before this method and would populate
|
||||
// the buffers properly. When the buffers are empty, this future is shortly dropped.
|
||||
let Some((read_buffer, write_buffer)) = stream.buffers.as_mut() else {
|
||||
return Poll::Pending;
|
||||
};
|
||||
|
||||
// Whether the future returned by `update_stream` should return `Ready` or `Pending`.
|
||||
let mut update_stream_future_ready = false;
|
||||
|
||||
if let StreamReadBuffer::Open {
|
||||
buffer: ref mut buf,
|
||||
ref mut cursor,
|
||||
} = read_buffer
|
||||
{
|
||||
// When reading data from the socket, `poll_read` might return "EOF". In that
|
||||
// situation, we transition to the `Closed` state, which would discard the data
|
||||
// currently in the buffer. For this reason, we only try to read if there is no
|
||||
// data left in the buffer.
|
||||
if cursor.start == cursor.end {
|
||||
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_read(cx, buf) {
|
||||
update_stream_future_ready = true;
|
||||
match result {
|
||||
Err(_) => {
|
||||
// End the stream.
|
||||
stream.buffers = None;
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Ok(0) => {
|
||||
// EOF.
|
||||
*read_buffer = StreamReadBuffer::Closed;
|
||||
}
|
||||
Ok(bytes) => {
|
||||
*cursor = 0..bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let StreamWriteBuffer::Open {
|
||||
buffer: ref mut buf,
|
||||
must_flush,
|
||||
must_close,
|
||||
} = write_buffer
|
||||
{
|
||||
while !buf.is_empty() {
|
||||
let write_queue_slices = buf.as_slices();
|
||||
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_write_vectored(
|
||||
cx,
|
||||
&[
|
||||
IoSlice::new(write_queue_slices.0),
|
||||
IoSlice::new(write_queue_slices.1),
|
||||
],
|
||||
) {
|
||||
if !*must_close {
|
||||
// In the situation where the API user wants to close the writing
|
||||
// side, simply sending the buffered data isn't enough to justify
|
||||
// making the future ready.
|
||||
update_stream_future_ready = true;
|
||||
}
|
||||
|
||||
match result {
|
||||
Err(_) => {
|
||||
// End the stream.
|
||||
stream.buffers = None;
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Ok(bytes) => {
|
||||
*must_flush = true;
|
||||
for _ in 0..bytes {
|
||||
buf.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if buf.is_empty() && *must_close {
|
||||
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_close(cx) {
|
||||
update_stream_future_ready = true;
|
||||
match result {
|
||||
Err(_) => {
|
||||
// End the stream.
|
||||
stream.buffers = None;
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Ok(()) => {
|
||||
*write_buffer = StreamWriteBuffer::Closed;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if *must_flush {
|
||||
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_flush(cx) {
|
||||
update_stream_future_ready = true;
|
||||
match result {
|
||||
Err(_) => {
|
||||
// End the stream.
|
||||
stream.buffers = None;
|
||||
return Poll::Ready(());
|
||||
}
|
||||
Ok(()) => {
|
||||
*must_flush = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if update_stream_future_ready {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
// Progress cannot be made since poll_read, poll_write, poll_close, poll_flush
|
||||
// are not ready yet. Smoldot drops this future and calls it again with the
|
||||
// next processing iteration.
|
||||
Poll::Pending
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn read_buffer<'a>(&self, stream: &'a mut Self::Stream) -> ReadBuffer<'a> {
|
||||
match stream.buffers.as_ref().map(|(r, _)| r) {
|
||||
None => ReadBuffer::Reset,
|
||||
Some(StreamReadBuffer::Closed) => ReadBuffer::Closed,
|
||||
Some(StreamReadBuffer::Open { buffer, cursor }) => {
|
||||
ReadBuffer::Open(&buffer[cursor.clone()])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn advance_read_cursor(&self, stream: &mut Self::Stream, extra_bytes: usize) {
|
||||
let Some(StreamReadBuffer::Open { ref mut cursor, .. }) =
|
||||
stream.buffers.as_mut().map(|(r, _)| r)
|
||||
else {
|
||||
assert_eq!(extra_bytes, 0);
|
||||
return;
|
||||
};
|
||||
|
||||
assert!(cursor.start + extra_bytes <= cursor.end);
|
||||
cursor.start += extra_bytes;
|
||||
}
|
||||
|
||||
fn writable_bytes(&self, stream: &mut Self::Stream) -> usize {
|
||||
let Some(StreamWriteBuffer::Open {
|
||||
ref mut buffer,
|
||||
must_close: false,
|
||||
..
|
||||
}) = stream.buffers.as_mut().map(|(_, w)| w)
|
||||
else {
|
||||
return 0;
|
||||
};
|
||||
buffer.capacity() - buffer.len()
|
||||
}
|
||||
|
||||
fn send(&self, stream: &mut Self::Stream, data: &[u8]) {
|
||||
debug_assert!(!data.is_empty());
|
||||
|
||||
// Because `writable_bytes` returns 0 if the writing side is closed, and because `data`
|
||||
// must always have a size inferior or equal to `writable_bytes`, we know for sure that
|
||||
// the writing side isn't closed.
|
||||
let Some(StreamWriteBuffer::Open { ref mut buffer, .. }) =
|
||||
stream.buffers.as_mut().map(|(_, w)| w)
|
||||
else {
|
||||
panic!()
|
||||
};
|
||||
buffer.reserve(data.len());
|
||||
buffer.extend(data.iter().copied());
|
||||
}
|
||||
|
||||
fn close_send(&self, stream: &mut Self::Stream) {
|
||||
// It is not illegal to call this on an already-reset stream.
|
||||
let Some((_, write_buffer)) = stream.buffers.as_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
match write_buffer {
|
||||
StreamWriteBuffer::Open {
|
||||
must_close: must_close @ false,
|
||||
..
|
||||
} => *must_close = true,
|
||||
_ => {
|
||||
// However, it is illegal to call this on a stream that was already close
|
||||
// attempted.
|
||||
panic!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_task(&self, _: std::borrow::Cow<str>, task: future::BoxFuture<'static, ()>) {
|
||||
super::wasm_helpers::spawn(task);
|
||||
fn spawn_task(
|
||||
&self,
|
||||
_task_name: std::borrow::Cow<str>,
|
||||
task: impl future::Future<Output = ()> + Send + 'static,
|
||||
) {
|
||||
wasm_bindgen_futures::spawn_local(task);
|
||||
}
|
||||
|
||||
fn client_name(&self) -> std::borrow::Cow<str> {
|
||||
@@ -312,4 +82,112 @@ impl PlatformRef for SubxtPlatform {
|
||||
fn client_version(&self) -> std::borrow::Cow<str> {
|
||||
env!("CARGO_PKG_VERSION").into()
|
||||
}
|
||||
|
||||
fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
|
||||
let result = matches!(
|
||||
connection_type,
|
||||
ConnectionType::WebSocketIpv4 { .. }
|
||||
| ConnectionType::WebSocketIpv6 { .. }
|
||||
| ConnectionType::WebSocketDns { .. }
|
||||
);
|
||||
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Supports connection type={:?} result={}",
|
||||
connection_type, result
|
||||
);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
|
||||
tracing::trace!(target: LOG_TARGET, "Connect stream to multiaddr={:?}", multiaddr);
|
||||
|
||||
// `PlatformRef` trait guarantees that `connect_stream` is only called with addresses
|
||||
// stated in `supports_connection_type`.
|
||||
let addr = match multiaddr {
|
||||
Address::WebSocketDns {
|
||||
hostname,
|
||||
port,
|
||||
secure: true,
|
||||
} => {
|
||||
format!("wss://{}:{}", hostname, port)
|
||||
}
|
||||
Address::WebSocketDns {
|
||||
hostname,
|
||||
port,
|
||||
secure: false,
|
||||
} => {
|
||||
format!("ws://{}:{}", hostname, port)
|
||||
}
|
||||
Address::WebSocketIp {
|
||||
ip: IpAddr::V4(ip),
|
||||
port,
|
||||
} => {
|
||||
let addr = SocketAddr::from((ip, port));
|
||||
format!("ws://{}", addr.to_string())
|
||||
}
|
||||
Address::WebSocketIp {
|
||||
ip: IpAddr::V6(ip),
|
||||
port,
|
||||
} => {
|
||||
let addr = SocketAddr::from((ip, port));
|
||||
format!("ws://{}", addr.to_string())
|
||||
}
|
||||
|
||||
// The API user of the `PlatformRef` trait is never supposed to open connections of
|
||||
// a type that isn't supported.
|
||||
_ => {
|
||||
unreachable!("Connecting to an address not supported. This code path indicates a bug in smoldot. Please raise an issue at https://github.com/smol-dot/smoldot/issues")
|
||||
}
|
||||
};
|
||||
|
||||
Box::pin(async move {
|
||||
tracing::debug!(target: LOG_TARGET, "Connecting to addr={addr}");
|
||||
|
||||
let socket = WasmSocket::new(addr.as_str()).map_err(|err| ConnectError {
|
||||
message: format!("Failed to reach peer: {err}"),
|
||||
})?;
|
||||
|
||||
Ok(super::wasm_helpers::Stream(
|
||||
super::wasm_helpers::with_buffers::WithBuffers::new(socket),
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture {
|
||||
panic!("Multistreams are not currently supported. This code path indicates a bug in smoldot. Please raise an issue at https://github.com/smol-dot/smoldot/issues")
|
||||
}
|
||||
|
||||
fn open_out_substream(&self, c: &mut Self::MultiStream) {
|
||||
// This function can only be called with so-called "multi-stream" connections. We never
|
||||
// open such connection.
|
||||
match *c {}
|
||||
}
|
||||
|
||||
fn next_substream(&self, c: &'_ mut Self::MultiStream) -> Self::NextSubstreamFuture<'_> {
|
||||
// This function can only be called with so-called "multi-stream" connections. We never
|
||||
// open such connection.
|
||||
match *c {}
|
||||
}
|
||||
|
||||
fn read_write_access<'a>(
|
||||
&self,
|
||||
stream: Pin<&'a mut Self::Stream>,
|
||||
) -> Result<Self::ReadWriteAccess<'a>, &'a io::Error> {
|
||||
let stream = stream.project();
|
||||
stream.0.read_write_access(Self::Instant::now())
|
||||
}
|
||||
|
||||
fn wait_read_write_again<'a>(
|
||||
&self,
|
||||
stream: Pin<&'a mut Self::Stream>,
|
||||
) -> Self::StreamUpdateFuture<'a> {
|
||||
let stream = stream.project();
|
||||
Box::pin(stream.0.wait_read_write_again(|when| async move {
|
||||
let now = super::wasm_helpers::now();
|
||||
let duration = when.saturating_duration_since(now);
|
||||
super::wasm_helpers::sleep(duration).await;
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user