From e11604411d1fff3c25f9321f7012303f2edbf2a0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 7 Jun 2023 15:42:01 +0300 Subject: [PATCH] Low level streams impl Signed-off-by: Alexandru Vasile --- subxt/src/rpc/lightclient/platform.rs | 509 ++++++++++++++++++++------ testing/wasm-tests/tests/wasm.rs | 48 ++- 2 files changed, 425 insertions(+), 132 deletions(-) diff --git a/subxt/src/rpc/lightclient/platform.rs b/subxt/src/rpc/lightclient/platform.rs index d46fe7ae81..065eef0ec5 100644 --- a/subxt/src/rpc/lightclient/platform.rs +++ b/subxt/src/rpc/lightclient/platform.rs @@ -1,4 +1,5 @@ use futures_timer::Delay; +use futures_util::AsyncWriteExt; use futures_util::{future, FutureExt}; use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError}; @@ -13,12 +14,18 @@ use smoldot_light::platform::PlatformSubstreamDirection; use std::sync::Arc; use std::sync::Mutex; +use std::task::Context; use std::{ io::IoSlice, net::{IpAddr, SocketAddr}, }; +use core::task::Poll; + +use core::ops; +use core::pin::Pin; use core::{mem, pin, str, task, time::Duration}; + use std::{ borrow::Cow, collections::{BTreeMap, VecDeque}, @@ -26,12 +33,16 @@ use std::{ time::{Instant, SystemTime, UNIX_EPOCH}, }; +use smoldot_light::platform::ReadBuffer; + use tokio::sync::{mpsc, oneshot}; +/// Wasm compatible light-client platform for executing low-level operations. #[derive(Clone)] pub struct Platform {} impl Platform { + /// Constructs a new [`Platform`]. pub const fn new() -> Self { Self {} } @@ -60,42 +71,36 @@ impl smoldot_light::platform::PlatformRef for Platform { future::Pending>; fn now_from_unix_epoch(&self) -> instant::Duration { - tracing::trace!("[call] now_from_unix_epoch"); + tracing::trace!("[now_from_unix_epoch]"); // The documentation of `now_from_unix_epoch()` mentions that it's ok to panic if we're // before the UNIX epoch. - let res = instant::SystemTime::now() .duration_since(instant::SystemTime::UNIX_EPOCH) - .unwrap_or_else(|_| panic!()); - - // let res = std::time::UNIX_EPOCH - // .elapsed() - // .expect("Invalid systime cannot be configured earlier than `UNIX_EPOCH`"); - tracing::trace!("[response] now_from_unix_epoch={:?}", res); + .unwrap_or_else(|_| { + panic!("Invalid systime cannot be configured earlier than `UNIX_EPOCH`") + }); + tracing::trace!("[now_from_unix_epoch] result={:?}", res); res } fn now(&self) -> Self::Instant { - // tracing::trace!("[call] now"); - let now = instant::Instant::now(); - // tracing::trace!("[res] now ={:?}", now); - now + // tracing::trace!("[now]"); + + instant::Instant::now() } fn sleep(&self, duration: Duration) -> Self::Delay { - tracing::trace!("[call] sleep"); - let future = futures_timer::Delay::new(duration).boxed(); - tracing::trace!("[res] sleep"); - future + tracing::trace!("[sleep] duration={:?}", duration); + + futures_timer::Delay::new(duration).boxed() } fn sleep_until(&self, when: Self::Instant) -> Self::Delay { - tracing::trace!("[call] sleep_until"); - let res = self.sleep(when.saturating_duration_since(self.now())); - tracing::trace!("[res] sleep_until"); - res + tracing::trace!("[sleep_until] when={:?}", when); + + self.sleep(when.saturating_duration_since(self.now())) } fn spawn_task( @@ -103,7 +108,7 @@ impl smoldot_light::platform::PlatformRef for Platform { task_name: std::borrow::Cow, task: futures_util::future::BoxFuture<'static, ()>, ) { - tracing::trace!("[call] spawn_task task_name={:?}", task_name); + tracing::trace!("[spawn_task] task_name={:?}", task_name); wasm_bindgen_futures::spawn_local(task) } @@ -117,36 +122,45 @@ impl smoldot_light::platform::PlatformRef for Platform { } fn yield_after_cpu_intensive(&self) -> Self::Yield { + tracing::trace!("[yield_after_cpu_intensive]"); future::ready(()) } fn connect(&self, url: &str) -> Self::ConnectFuture { - tracing::trace!("[call] connect url={:?}", url); + tracing::trace!("[connect] url={:?}", url); let url = url.to_string(); - Box::pin(async move { - // let url = url.to_string(); - - let multiaddr = url.parse::().map_err(|_| ConnectError { - message: format!("Address {url} is not a valid multiaddress"), - is_bad_addr: true, + let multiaddr = url.parse::().map_err(|err| { + tracing::trace!("[connect] Address provided {} is invalid {:?}", url, err); + ConnectError { + message: format!("Address {url} is not a valid multiaddress"), + is_bad_addr: true, + } })?; // First two protocals must be valid, the third one is optional. let mut proto_iter = multiaddr.iter().fuse(); - let addr = match ( - proto_iter.next().ok_or(ConnectError { + let proto1 = proto_iter.next().ok_or_else(|| { + tracing::trace!("[connect] Cannot find first protocol"); + ConnectError { message: format!("Unknown protocol combination"), is_bad_addr: true, - })?, - proto_iter.next().ok_or(ConnectError { + } + })?; + + let proto2 = proto_iter.next().ok_or_else(|| { + tracing::trace!("[connect] Cannot find second protocol"); + ConnectError { message: format!("Unknown protocol combination"), is_bad_addr: true, - })?, - proto_iter.next(), - ) { + } + })?; + + let proto3 = proto_iter.next(); + + let addr = match (proto1, proto2, proto3) { (ProtocolRef::Ip4(ip), ProtocolRef::Tcp(port), None) => { SocketAddr::new(IpAddr::V4((ip).into()), port) } @@ -159,61 +173,43 @@ impl smoldot_light::platform::PlatformRef for Platform { (ProtocolRef::Ip6(ip), ProtocolRef::Tcp(port), Some(ProtocolRef::Ws)) => { SocketAddr::new(IpAddr::V6((ip).into()), port) } - // TODO: Minimal protocol, check that basic connection is working. - // TODO: we don't care about the differences between Dns, Dns4, and Dns6 - // ( - // ProtocolRef::Dns(addr) | ProtocolRef::Dns4(addr) | ProtocolRef::Dns6(addr), - // ProtocolRef::Tcp(port), - // None, - // ) => (either::Right((addr.to_string(), *port)), None), - // ( - // ProtocolRef::Dns(addr) | ProtocolRef::Dns4(addr) | ProtocolRef::Dns6(addr), - // ProtocolRef::Tcp(port), - // Some(ProtocolRef::Ws), - // ) => ( - // either::Right((addr.to_string(), *port)), - // Some(format!("{}:{}", addr, *port)), - // ), _ => { + tracing::warn!("[connect] Unknown protocol combination"); + return Err(ConnectError { is_bad_addr: true, message: "Unknown protocols combination".to_string(), - }) + }); } }; + let addr = format!("ws://{}", addr.to_string()); + tracing::trace!("[connect] Connecting to addr={:?}", addr); + // TODO: use `addr` instead. - let websocket = WebSocket::open(url.as_ref()).map_err(|e| ConnectError { - is_bad_addr: false, - message: "Cannot stablish WebSocket connection".to_string(), + let websocket = WebSocket::open(addr.as_ref()).map_err(|err| { + tracing::trace!("[connect] Cannot connect to add {:?}", err); + ConnectError { + is_bad_addr: false, + message: "Cannot stablish WebSocket connection".to_string(), + } })?; + tracing::trace!("[connect] Connection established"); - // let (to_sender, from_sender) = mpsc::channel(1024); - // let (to_receiver, from_receiver) = mpsc::channel(1024); - - //TODO: Spawn a task: - // enum Protocol { - // Send(String), - // Recv(String), - // } - // - // bg_task { - // while .. - // Future: - // A: if Send() .. self.send(); - // B: self.next() -> recv_end.send("Msg"); - // - // } - // - // from fn send(&self, stream: &mut Self::Stream, data: &[u8]) { - // let str: String = data.into(); - // sender.send(str); - // } - // - // from fn read_buffer<'a> { let (sender, receiver) = websocket.split(); let conn = ConnectionStream { inner: Arc::new(Mutex::new(ConnectionInner { sender, receiver })), + buffers: Some(( + StreamReadBuffer::Open { + buffer: vec![0; 16384], + cursor: 0..0, + }, + StreamWriteBuffer::Open { + buffer: VecDeque::with_capacity(16384), + must_close: false, + must_flush: false, + }, + )), }; Ok(PlatformConnection::SingleStreamMultistreamSelectNoiseYamux( @@ -223,6 +219,8 @@ impl smoldot_light::platform::PlatformRef for Platform { } fn open_out_substream(&self, _connection: &mut Self::Connection) { + tracing::trace!("[call] open_out_substream"); + // Called from MultiStream connections that are never opened for this implementation. } @@ -230,87 +228,358 @@ impl smoldot_light::platform::PlatformRef for Platform { &self, connection: &'a mut Self::Connection, ) -> Self::NextSubstreamFuture<'a> { + tracing::trace!("[call] next_substream"); // Called from MultiStream connections that are never opened for this implementation. // futures::future::pending::>() futures::future::pending() } fn update_stream<'a>(&self, stream: &'a mut Self::Stream) -> Self::StreamUpdateFuture<'a> { - tracing::trace!("[call] update_stream"); + tracing::trace!("[update_stream]"); - Box::pin(async move {}) + use futures::Future; + + Box::pin(future::poll_fn(|cx| { + let Some((read_buffer, write_buffer)) = stream.buffers.as_mut() else { + tracing::trace!("[update_stream] Buffers are empty"); + return Poll::Pending + }; + + let mut locked = stream.inner.lock().unwrap(); + + // Whether the future returned by `update_stream` should return `Ready` or `Pending`. + let mut update_stream_future_ready = false; + + if let StreamReadBuffer::Open { + buffer: ref mut buf, + ref mut cursor, + } = read_buffer + { + tracing::trace!("[update_stream] StreamReadBuffer is open"); + + // When reading data from the socket, `poll_read` might return "EOF". In that + // situation, we transition to the `Closed` state, which would discard the data + // currently in the buffer. For this reason, we only try to read if there is no + // data left in the buffer. + if cursor.start == cursor.end { + let mut stream_recv = locked.receiver.next(); + if let Poll::Ready(result) = Pin::new(&mut stream_recv).poll(cx) { + tracing::trace!("[update_stream] Received from socket"); + update_stream_future_ready = true; + match result { + Some(Ok(message)) => { + tracing::trace!( + "[update_stream] Received from socket message={:?}", + message + ); + // These bytes must end-up in the read buffer. + let bytes = match message { + Message::Text(text) => text.into_bytes(), + Message::Bytes(bytes) => bytes, + }; + + for (index, byte) in bytes.iter().enumerate() { + buf[index] = *byte; + } + + *cursor = 0..bytes.len(); + } + Some(Err(err)) => { + tracing::warn!( + "[update_stream] Reached Websocket error: {:?}", + err + ); + + stream.buffers = None; + return Poll::Ready(()); + } + None => { + tracing::warn!("[update_stream] Reached EOF"); + // EOF. + *read_buffer = StreamReadBuffer::Closed; + } + } + } + } + } + + if let StreamWriteBuffer::Open { + buffer: ref mut buf, + must_flush, + must_close, + } = write_buffer + { + while !buf.is_empty() { + let write_queue_slices = buf.as_slices(); + let len = write_queue_slices.0.len() + write_queue_slices.1.len(); + + let slices = &[ + IoSlice::new(write_queue_slices.0), + IoSlice::new(write_queue_slices.1), + ]; + + tracing::trace!( + "[update_stream] Prepare to send first={:?}", + write_queue_slices.0 + ); + + tracing::trace!( + "[update_stream] Prepare to send second={:?}", + write_queue_slices.1 + ); + + let len = write_queue_slices.1.len(); + let message = Message::Bytes(write_queue_slices.1.to_owned()); + + tracing::trace!("[update_stream] Sending={:?} len={}", message, len); + + let mut stream_send = locked.sender.send(message); + + if let Poll::Ready(result) = Pin::new(&mut stream_send).poll(cx) { + if !*must_close { + // In the situation where the API user wants to close the writing + // side, simply sending the buffered data isn't enough to justify + // making the future ready. + update_stream_future_ready = true; + } + + match result { + Err(err) => { + tracing::trace!("[update_stream] Sending Error {:?}", err); + + // End the stream. + stream.buffers = None; + return Poll::Ready(()); + } + Ok(_) => { + tracing::trace!("[update_stream] Sending ok"); + + *must_flush = true; + for _ in 0..len { + buf.pop_front(); + } + } + } + } else { + break; + } + } + + // if buf.is_empty() && *must_close { + // if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_close(cx) { + // update_stream_future_ready = true; + // match result { + // Err(_) => { + // // End the stream. + // stream.buffers = None; + // return Poll::Ready(()); + // } + // Ok(()) => { + // *write_buffer = StreamWriteBuffer::Closed; + // } + // } + // } + // } else if *must_flush { + // if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_flush(cx) { + // update_stream_future_ready = true; + // match result { + // Err(_) => { + // // End the stream. + // stream.buffers = None; + // return Poll::Ready(()); + // } + // Ok(()) => { + // *must_flush = false; + // } + // } + // } + // } + } + + if update_stream_future_ready { + tracing::trace!("[update_stream] Future ready"); + + Poll::Ready(()) + } else { + tracing::trace!("[update_stream] Future pending"); + + Poll::Pending + } + })) + // Box::pin(async move { + + // }) } fn read_buffer<'a>( &self, stream: &'a mut Self::Stream, ) -> smoldot_light::platform::ReadBuffer<'a> { - tracing::trace!("[call] read_buffer"); + tracing::trace!("[read_buffer]"); - let mut locked = stream - .inner - .lock() - .expect("Mutex should not be poised; qed"); - - // let msg = futures_executor::block_on(async { - let msg = futures::executor::block_on(async { - match locked.receiver.next().await { - Some(Ok(msg)) => Some(msg), - _ => None, + match stream.buffers.as_ref().map(|(r, _)| r) { + None => ReadBuffer::Reset, + Some(StreamReadBuffer::Closed) => ReadBuffer::Closed, + Some(StreamReadBuffer::Open { buffer, cursor }) => { + ReadBuffer::Open(&buffer[cursor.clone()]) } - }); - - match msg { - Some(msg) => { - let msg = Box::leak(Box::new(msg)); - - match msg { - Message::Text(text) => { - smoldot_light::platform::ReadBuffer::Open(text.as_bytes()) - } - Message::Bytes(bytes) => smoldot_light::platform::ReadBuffer::Open(bytes), - } - } - None => smoldot_light::platform::ReadBuffer::Closed, } + + // let mut locked = stream + // .inner + // .lock() + // .expect("Mutex should not be poised; qed"); + + // // let recv_future = Box::pin(locked.receiver.next()); + + // let mut future = locked.receiver.next(); + + // match future.poll_unpin(&mut Context::from_waker( + // futures_util::task::noop_waker_ref(), + // )) { + // task::Poll::Ready(result) => { + // tracing::warn!("Got result {:?}", result); + + // panic!("OPS with result {:?}", result); + // } + // task::Poll::Pending => { + // // panic!("OPS pending"); + + // smoldot_light::platform::ReadBuffer::Closed + // // tracing::warn!("Got pending..."); + // } + // } + + // match future::Future::poll( + // locked.receiver.next().fuse(), + // &mut Context::from_waker(futures_util::task::noop_waker_ref()), + // ) { + // task::Poll::Ready(result) => { + // tracing::warn!("Got result {:?}", result); + // } + // task::Poll::Pending => { + // tracing::warn!("Got pending..."); + // } + // }; + + // panic!("OPS - from reading"); + + // // let msg = futures_executor::block_on(async { + // let msg = futures::executor::block_on(async { + // match locked.receiver.next().await { + // Some(Ok(msg)) => Some(msg), + // _ => None, + // } + // }); + + // match msg { + // Some(msg) => { + // let msg = Box::leak(Box::new(msg)); + + // match msg { + // Message::Text(text) => { + // smoldot_light::platform::ReadBuffer::Open(text.as_bytes()) + // } + // Message::Bytes(bytes) => smoldot_light::platform::ReadBuffer::Open(bytes), + // } + // } + // None => smoldot_light::platform::ReadBuffer::Closed, + // } } fn advance_read_cursor(&self, stream: &mut Self::Stream, bytes: usize) { - tracing::trace!("[call] advance_read_cursor"); + tracing::trace!("[advance_read_cursor]"); + + let Some(StreamReadBuffer::Open { ref mut cursor, .. }) = + stream.buffers.as_mut().map(|(r, _)| r) + else { + assert_eq!(bytes, 0); + return + }; + + assert!(cursor.start + bytes <= cursor.end); + cursor.start += bytes; } fn writable_bytes(&self, stream: &mut Self::Stream) -> usize { - tracing::trace!("[call] writable_bytes"); - 1024 + tracing::trace!("[writable_bytes]"); + + let Some(StreamWriteBuffer::Open { ref mut buffer, must_close: false, ..}) = + stream.buffers.as_mut().map(|(_, w)| w) else { return 0 }; + buffer.capacity() - buffer.len() } fn send(&self, stream: &mut Self::Stream, data: &[u8]) { - tracing::trace!("[call] send"); + tracing::trace!("[send] data={:?}", data); - let mut locked = stream - .inner - .lock() - .expect("Mutex should not be poised; qed"); + let Some(StreamWriteBuffer::Open { ref mut buffer, .. } )= + stream.buffers.as_mut().map(|(_, w)| w) else { panic!() }; + buffer.reserve(data.len()); + buffer.extend(data.iter().copied()); - if let Ok(message) = String::from_utf8(data.into()) { - let _ = locked.sender.send(Message::Text(message)); - } + // let mut locked = stream + // .inner + // .lock() + // .expect("Mutex should not be poised; qed"); + + // if let Ok(message) = String::from_utf8(data.into()) { + // let _ = locked.sender.send(Message::Text(message)); + // } } fn close_send(&self, stream: &mut Self::Stream) { - tracing::trace!("[call] close_send"); + tracing::trace!("[close_send]"); + + // It is not illegal to call this on an already-reset stream. + let Some((_, write_buffer)) = stream.buffers.as_mut() else { return }; + + match write_buffer { + StreamWriteBuffer::Open { + must_close: must_close @ false, + .. + } => *must_close = true, + _ => { + // However, it is illegal to call this on a stream that was already close + // attempted. + panic!() + } + } } } -pub struct ConnectionInner { +/// Connection stream of the light-client. +pub struct ConnectionStream { + inner: Arc>, + + /// Read and write buffers of the connection, or `None` if the socket has been reset. + buffers: Option<(StreamReadBuffer, StreamWriteBuffer)>, +} + +/// Safe to implement `Send` in single threaded environments (WASM). +unsafe impl Send for ConnectionStream {} + +/// Inner details of a `ConnectionStream` that represents the web socket. +struct ConnectionInner { sender: SplitSink, receiver: SplitStream, } +/// Safe to implement `Send` in single threaded environments (WASM). unsafe impl Send for ConnectionInner {} -pub struct ConnectionStream { - inner: Arc>, +enum StreamReadBuffer { + Open { + buffer: Vec, + cursor: ops::Range, + }, + Closed, } -unsafe impl Send for ConnectionStream {} +enum StreamWriteBuffer { + Open { + buffer: VecDeque, + must_flush: bool, + must_close: bool, + }, + Closed, +} diff --git a/testing/wasm-tests/tests/wasm.rs b/testing/wasm-tests/tests/wasm.rs index fa41c971c9..c4c7d1d3cb 100644 --- a/testing/wasm-tests/tests/wasm.rs +++ b/testing/wasm-tests/tests/wasm.rs @@ -4,6 +4,9 @@ use subxt::config::PolkadotConfig; use subxt::rpc::LightClient; use wasm_bindgen_test::*; use std::sync::Arc; +use serde_json::value::RawValue; +use subxt::rpc::RpcClientT; +use subxt::rpc::Subscription; wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); @@ -14,15 +17,15 @@ fn init_tracing() { tracing_wasm::set_as_global_default(); } -#[wasm_bindgen_test] -async fn wasm_ws_transport_works() { - let client = subxt::client::OnlineClient::::from_url("ws://127.0.0.1:9944") - .await - .unwrap(); +// #[wasm_bindgen_test] +// async fn wasm_ws_transport_works() { +// let client = subxt::client::OnlineClient::::from_url("ws://127.0.0.1:9944") +// .await +// .unwrap(); - let chain = client.rpc().system_chain().await.unwrap(); - assert_eq!(&chain, "Development"); -} +// let chain = client.rpc().system_chain().await.unwrap(); +// assert_eq!(&chain, "Development"); +// } #[wasm_bindgen_test] async fn light_client_transport_works() { @@ -32,11 +35,32 @@ async fn light_client_transport_works() { let light_client = LightClient::new(include_str!("../../../artifacts/dev_spec.json")).unwrap(); tracing::warn!("RPC layer created.."); - let client = subxt::client::OnlineClient::::from_rpc_client(Arc::new(light_client)).await.unwrap(); + // Note: It is impractical to construct a full subxt `OnlineClient` because the + // light-client must sync with the tip of the chain to fetch the Runtime Version + // needed by subxt. + // The default wasm-bindgen test timeout is 20 seconds, tested locally the + // client does not sync in time with WASM_BINDGEN_TEST_TIMEOUT=1000 (around 16 seconds). - tracing::warn!("Client Created"); + // Test raw RPC method calls. + let chain = light_client.request_raw("system_chain", None).await.unwrap(); + let chain: String = serde_json::from_str(chain.get()).unwrap(); + assert_eq!(&chain, "Development"); + + let param = RawValue::from_string("[0]".to_owned()).expect("Should be valid JSON"); + let genesis = light_client.request_raw("chain_getBlockHash", Some(param)).await.unwrap(); + let genesis: String = serde_json::from_str(genesis.get()).unwrap(); + assert!(genesis.starts_with("0x")); + tracing::warn!("Genesis hash {:?}", genesis); - // let chain = client.rpc().system_chain().await.unwrap(); - // assert_eq!(&chain, "Development"); + // Ensure light-client functions with subscriptions. + let sub = light_client.subscribe_raw("chain_subscribeAllHeads", None, "chain_unsubscribeAllHeads").await.unwrap(); + // The subscription result is actually a PolkadotConfig::Header, we are interested in iteration. + let mut sub: Subscription = Subscription::new(sub); + + let block = sub.next().await.expect("Subscription failed").expect("Subscription ended"); + tracing::warn!("Block hash {:?}", block); + + let block = sub.next().await.expect("Subscription failed").expect("Subscription ended"); + tracing::warn!("Block hash {:?}", block); }