From dcd0d368a3e56aac93d79fc03fc0ce1aefe3c48b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 7 Jun 2023 18:00:18 +0300 Subject: [PATCH] backup Signed-off-by: Alexandru Vasile --- subxt/src/rpc/lightclient/platform.rs | 153 ++++++++++++++++++++++++-- testing/wasm-tests/tests/wasm.rs | 40 +++++-- 2 files changed, 169 insertions(+), 24 deletions(-) diff --git a/subxt/src/rpc/lightclient/platform.rs b/subxt/src/rpc/lightclient/platform.rs index 065eef0ec5..fd673e8a0a 100644 --- a/subxt/src/rpc/lightclient/platform.rs +++ b/subxt/src/rpc/lightclient/platform.rs @@ -14,6 +14,8 @@ use smoldot_light::platform::PlatformSubstreamDirection; use std::sync::Arc; use std::sync::Mutex; +// use futures::lock::Mutex; + use std::task::Context; use std::{ io::IoSlice, @@ -198,6 +200,7 @@ impl smoldot_light::platform::PlatformRef for Platform { let (sender, receiver) = websocket.split(); let conn = ConnectionStream { + // inner: Arc::new(Mutex::new(socket)), inner: Arc::new(Mutex::new(ConnectionInner { sender, receiver })), buffers: Some(( StreamReadBuffer::Open { @@ -239,6 +242,117 @@ impl smoldot_light::platform::PlatformRef for Platform { use futures::Future; + // Box::pin(async move { + // let Some((read_buffer, write_buffer)) = stream.buffers.as_mut() else { + // tracing::trace!("[update_stream] Buffers are empty"); + // return + // }; + + // let mut locked = stream.inner.lock().await; + + // if let StreamReadBuffer::Open { + // buffer: ref mut buf, + // ref mut cursor, + // } = read_buffer + // { + // tracing::trace!("[update_stream] StreamReadBuffer is open"); + + // // When reading data from the socket, `poll_read` might return "EOF". In that + // // situation, we transition to the `Closed` state, which would discard the data + // // currently in the buffer. For this reason, we only try to read if there is no + // // data left in the buffer. + // if cursor.start == cursor.end { + // let result = locked.receiver.next().await; + + // match result { + // Some(Ok(message)) => { + // tracing::trace!( + // "[update_stream] Received from socket message={:?}", + // message + // ); + // // These bytes must end-up in the read buffer. + // let bytes = match message { + // Message::Text(text) => text.into_bytes(), + // Message::Bytes(bytes) => bytes, + // }; + + // for (index, byte) in bytes.iter().enumerate() { + // buf[index] = *byte; + // } + + // *cursor = 0..bytes.len(); + // } + // Some(Err(err)) => { + // tracing::warn!("[update_stream] Reached Websocket error: {:?}", err); + + // stream.buffers = None; + // return; + // } + // None => { + // tracing::warn!("[update_stream] Reached EOF"); + // // EOF. + // *read_buffer = StreamReadBuffer::Closed; + // } + // }; + // } + // } + + // if let StreamWriteBuffer::Open { + // buffer: ref mut buf, + // must_flush, + // must_close, + // } = write_buffer + // { + // while !buf.is_empty() { + // let write_queue_slices = buf.as_slices(); + // let len = write_queue_slices.0.len() + write_queue_slices.1.len(); + + // let slices = &[ + // IoSlice::new(write_queue_slices.0), + // IoSlice::new(write_queue_slices.1), + // ]; + + // let message_str = String::from_utf8(write_queue_slices.0.to_vec()); + // tracing::trace!( + // "[update_stream] Prepare to send first={:?} message={:?}", + // write_queue_slices.0, + // message_str + // ); + + // tracing::trace!( + // "[update_stream] Prepare to send second={:?}", + // write_queue_slices.1 + // ); + + // let len = write_queue_slices.1.len(); + // let message = Message::Bytes(write_queue_slices.0.to_owned()); + + // tracing::trace!("[update_stream] Sending={:?} len={}", message, len); + + // let result = locked.sender.send(message).await; + + // let mut stream_send = locked.sender.send(message); + // match result { + // Err(err) => { + // tracing::trace!("[update_stream] Sending Error {:?}", err); + + // // End the stream. + // stream.buffers = None; + // return; + // } + // Ok(_) => { + // tracing::trace!("[update_stream] Sending ok"); + + // *must_flush = true; + // for _ in 0..len { + // buf.pop_front(); + // } + // } + // } + // } + // } + // }) + Box::pin(future::poll_fn(|cx| { let Some((read_buffer, write_buffer)) = stream.buffers.as_mut() else { tracing::trace!("[update_stream] Buffers are empty"); @@ -246,7 +360,6 @@ impl smoldot_light::platform::PlatformRef for Platform { }; let mut locked = stream.inner.lock().unwrap(); - // Whether the future returned by `update_stream` should return `Ready` or `Pending`. let mut update_stream_future_ready = false; @@ -300,6 +413,8 @@ impl smoldot_light::platform::PlatformRef for Platform { } } } + } else { + tracing::trace!("[update_stream] No need to read from socket"); } } @@ -318,9 +433,11 @@ impl smoldot_light::platform::PlatformRef for Platform { IoSlice::new(write_queue_slices.1), ]; + let message_str = String::from_utf8(write_queue_slices.0.to_vec()); tracing::trace!( - "[update_stream] Prepare to send first={:?}", - write_queue_slices.0 + "[update_stream] Prepare to send first={:?} message={:?}", + write_queue_slices.0, + message_str ); tracing::trace!( @@ -328,13 +445,12 @@ impl smoldot_light::platform::PlatformRef for Platform { write_queue_slices.1 ); - let len = write_queue_slices.1.len(); - let message = Message::Bytes(write_queue_slices.1.to_owned()); + let len = write_queue_slices.0.len(); + let message = Message::Bytes(write_queue_slices.0.to_owned()); tracing::trace!("[update_stream] Sending={:?} len={}", message, len); let mut stream_send = locked.sender.send(message); - if let Poll::Ready(result) = Pin::new(&mut stream_send).poll(cx) { if !*must_close { // In the situation where the API user wants to close the writing @@ -365,7 +481,11 @@ impl smoldot_light::platform::PlatformRef for Platform { } } - // if buf.is_empty() && *must_close { + if buf.is_empty() && *must_close { + tracing::trace!("[update_stream] MUST poll_close",); + } else if *must_flush { + tracing::trace!("[update_stream] MUST poll_flush",); + } // if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_close(cx) { // update_stream_future_ready = true; // match result { @@ -406,9 +526,6 @@ impl smoldot_light::platform::PlatformRef for Platform { Poll::Pending } })) - // Box::pin(async move { - - // }) } fn read_buffer<'a>( @@ -488,15 +605,23 @@ impl smoldot_light::platform::PlatformRef for Platform { } fn advance_read_cursor(&self, stream: &mut Self::Stream, bytes: usize) { - tracing::trace!("[advance_read_cursor]"); + tracing::trace!("[advance_read_cursor] bytes={:?}", bytes); - let Some(StreamReadBuffer::Open { ref mut cursor, .. }) = + let Some(StreamReadBuffer::Open { ref mut cursor, buffer }) = stream.buffers.as_mut().map(|(r, _)| r) else { assert_eq!(bytes, 0); return }; + let message_slice = &buffer[cursor.start..cursor.start + bytes]; + let message_str = String::from_utf8(message_slice.to_vec()); + tracing::trace!( + "[advance_read_cursor] message bytes={:?} message={:?}", + message_slice, + message_str, + ); + assert!(cursor.start + bytes <= cursor.end); cursor.start += bytes; } @@ -583,3 +708,7 @@ enum StreamWriteBuffer { }, Closed, } + +// pub struct TcpStream { +// socket: Rc>, +// } diff --git a/testing/wasm-tests/tests/wasm.rs b/testing/wasm-tests/tests/wasm.rs index c4c7d1d3cb..f96c832c41 100644 --- a/testing/wasm-tests/tests/wasm.rs +++ b/testing/wasm-tests/tests/wasm.rs @@ -1,20 +1,20 @@ #![cfg(target_arch = "wasm32")] +use serde_json::value::RawValue; +use std::sync::Arc; use subxt::config::PolkadotConfig; use subxt::rpc::LightClient; -use wasm_bindgen_test::*; -use std::sync::Arc; -use serde_json::value::RawValue; use subxt::rpc::RpcClientT; use subxt::rpc::Subscription; +use wasm_bindgen_test::*; wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); /// Run the tests by `$ wasm-pack test --firefox --headless` fn init_tracing() { - console_error_panic_hook::set_once(); - tracing_wasm::set_as_global_default(); + console_error_panic_hook::set_once(); + tracing_wasm::set_as_global_default(); } // #[wasm_bindgen_test] @@ -29,7 +29,7 @@ fn init_tracing() { #[wasm_bindgen_test] async fn light_client_transport_works() { - init_tracing(); + init_tracing(); tracing::warn!("Starting test"); let light_client = LightClient::new(include_str!("../../../artifacts/dev_spec.json")).unwrap(); @@ -42,25 +42,41 @@ async fn light_client_transport_works() { // client does not sync in time with WASM_BINDGEN_TEST_TIMEOUT=1000 (around 16 seconds). // Test raw RPC method calls. - let chain = light_client.request_raw("system_chain", None).await.unwrap(); + let chain = light_client + .request_raw("system_chain", None) + .await + .unwrap(); let chain: String = serde_json::from_str(chain.get()).unwrap(); assert_eq!(&chain, "Development"); let param = RawValue::from_string("[0]".to_owned()).expect("Should be valid JSON"); - let genesis = light_client.request_raw("chain_getBlockHash", Some(param)).await.unwrap(); + let genesis = light_client + .request_raw("chain_getBlockHash", Some(param)) + .await + .unwrap(); let genesis: String = serde_json::from_str(genesis.get()).unwrap(); assert!(genesis.starts_with("0x")); tracing::warn!("Genesis hash {:?}", genesis); - // Ensure light-client functions with subscriptions. - let sub = light_client.subscribe_raw("chain_subscribeAllHeads", None, "chain_unsubscribeAllHeads").await.unwrap(); + let sub = light_client + .subscribe_raw("chain_subscribeAllHeads", None, "chain_unsubscribeAllHeads") + .await + .unwrap(); // The subscription result is actually a PolkadotConfig::Header, we are interested in iteration. let mut sub: Subscription = Subscription::new(sub); - let block = sub.next().await.expect("Subscription failed").expect("Subscription ended"); + let block = sub + .next() + .await + .expect("Subscription failed") + .expect("Subscription ended"); tracing::warn!("Block hash {:?}", block); - let block = sub.next().await.expect("Subscription failed").expect("Subscription ended"); + let block = sub + .next() + .await + .expect("Subscription failed") + .expect("Subscription ended"); tracing::warn!("Block hash {:?}", block); }