Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2023-06-07 18:00:18 +03:00
parent e11604411d
commit dcd0d368a3
2 changed files with 169 additions and 24 deletions
+141 -12
View File
@@ -14,6 +14,8 @@ use smoldot_light::platform::PlatformSubstreamDirection;
use std::sync::Arc;
use std::sync::Mutex;
// use futures::lock::Mutex;
use std::task::Context;
use std::{
io::IoSlice,
@@ -198,6 +200,7 @@ impl smoldot_light::platform::PlatformRef for Platform {
let (sender, receiver) = websocket.split();
let conn = ConnectionStream {
// inner: Arc::new(Mutex::new(socket)),
inner: Arc::new(Mutex::new(ConnectionInner { sender, receiver })),
buffers: Some((
StreamReadBuffer::Open {
@@ -239,6 +242,117 @@ impl smoldot_light::platform::PlatformRef for Platform {
use futures::Future;
// Box::pin(async move {
// let Some((read_buffer, write_buffer)) = stream.buffers.as_mut() else {
// tracing::trace!("[update_stream] Buffers are empty");
// return
// };
// let mut locked = stream.inner.lock().await;
// if let StreamReadBuffer::Open {
// buffer: ref mut buf,
// ref mut cursor,
// } = read_buffer
// {
// tracing::trace!("[update_stream] StreamReadBuffer is open");
// // When reading data from the socket, `poll_read` might return "EOF". In that
// // situation, we transition to the `Closed` state, which would discard the data
// // currently in the buffer. For this reason, we only try to read if there is no
// // data left in the buffer.
// if cursor.start == cursor.end {
// let result = locked.receiver.next().await;
// match result {
// Some(Ok(message)) => {
// tracing::trace!(
// "[update_stream] Received from socket message={:?}",
// message
// );
// // These bytes must end-up in the read buffer.
// let bytes = match message {
// Message::Text(text) => text.into_bytes(),
// Message::Bytes(bytes) => bytes,
// };
// for (index, byte) in bytes.iter().enumerate() {
// buf[index] = *byte;
// }
// *cursor = 0..bytes.len();
// }
// Some(Err(err)) => {
// tracing::warn!("[update_stream] Reached Websocket error: {:?}", err);
// stream.buffers = None;
// return;
// }
// None => {
// tracing::warn!("[update_stream] Reached EOF");
// // EOF.
// *read_buffer = StreamReadBuffer::Closed;
// }
// };
// }
// }
// if let StreamWriteBuffer::Open {
// buffer: ref mut buf,
// must_flush,
// must_close,
// } = write_buffer
// {
// while !buf.is_empty() {
// let write_queue_slices = buf.as_slices();
// let len = write_queue_slices.0.len() + write_queue_slices.1.len();
// let slices = &[
// IoSlice::new(write_queue_slices.0),
// IoSlice::new(write_queue_slices.1),
// ];
// let message_str = String::from_utf8(write_queue_slices.0.to_vec());
// tracing::trace!(
// "[update_stream] Prepare to send first={:?} message={:?}",
// write_queue_slices.0,
// message_str
// );
// tracing::trace!(
// "[update_stream] Prepare to send second={:?}",
// write_queue_slices.1
// );
// let len = write_queue_slices.1.len();
// let message = Message::Bytes(write_queue_slices.0.to_owned());
// tracing::trace!("[update_stream] Sending={:?} len={}", message, len);
// let result = locked.sender.send(message).await;
// let mut stream_send = locked.sender.send(message);
// match result {
// Err(err) => {
// tracing::trace!("[update_stream] Sending Error {:?}", err);
// // End the stream.
// stream.buffers = None;
// return;
// }
// Ok(_) => {
// tracing::trace!("[update_stream] Sending ok");
// *must_flush = true;
// for _ in 0..len {
// buf.pop_front();
// }
// }
// }
// }
// }
// })
Box::pin(future::poll_fn(|cx| {
let Some((read_buffer, write_buffer)) = stream.buffers.as_mut() else {
tracing::trace!("[update_stream] Buffers are empty");
@@ -246,7 +360,6 @@ impl smoldot_light::platform::PlatformRef for Platform {
};
let mut locked = stream.inner.lock().unwrap();
// Whether the future returned by `update_stream` should return `Ready` or `Pending`.
let mut update_stream_future_ready = false;
@@ -300,6 +413,8 @@ impl smoldot_light::platform::PlatformRef for Platform {
}
}
}
} else {
tracing::trace!("[update_stream] No need to read from socket");
}
}
@@ -318,9 +433,11 @@ impl smoldot_light::platform::PlatformRef for Platform {
IoSlice::new(write_queue_slices.1),
];
let message_str = String::from_utf8(write_queue_slices.0.to_vec());
tracing::trace!(
"[update_stream] Prepare to send first={:?}",
write_queue_slices.0
"[update_stream] Prepare to send first={:?} message={:?}",
write_queue_slices.0,
message_str
);
tracing::trace!(
@@ -328,13 +445,12 @@ impl smoldot_light::platform::PlatformRef for Platform {
write_queue_slices.1
);
let len = write_queue_slices.1.len();
let message = Message::Bytes(write_queue_slices.1.to_owned());
let len = write_queue_slices.0.len();
let message = Message::Bytes(write_queue_slices.0.to_owned());
tracing::trace!("[update_stream] Sending={:?} len={}", message, len);
let mut stream_send = locked.sender.send(message);
if let Poll::Ready(result) = Pin::new(&mut stream_send).poll(cx) {
if !*must_close {
// In the situation where the API user wants to close the writing
@@ -365,7 +481,11 @@ impl smoldot_light::platform::PlatformRef for Platform {
}
}
// if buf.is_empty() && *must_close {
if buf.is_empty() && *must_close {
tracing::trace!("[update_stream] MUST poll_close",);
} else if *must_flush {
tracing::trace!("[update_stream] MUST poll_flush",);
}
// if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_close(cx) {
// update_stream_future_ready = true;
// match result {
@@ -406,9 +526,6 @@ impl smoldot_light::platform::PlatformRef for Platform {
Poll::Pending
}
}))
// Box::pin(async move {
// })
}
fn read_buffer<'a>(
@@ -488,15 +605,23 @@ impl smoldot_light::platform::PlatformRef for Platform {
}
fn advance_read_cursor(&self, stream: &mut Self::Stream, bytes: usize) {
tracing::trace!("[advance_read_cursor]");
tracing::trace!("[advance_read_cursor] bytes={:?}", bytes);
let Some(StreamReadBuffer::Open { ref mut cursor, .. }) =
let Some(StreamReadBuffer::Open { ref mut cursor, buffer }) =
stream.buffers.as_mut().map(|(r, _)| r)
else {
assert_eq!(bytes, 0);
return
};
let message_slice = &buffer[cursor.start..cursor.start + bytes];
let message_str = String::from_utf8(message_slice.to_vec());
tracing::trace!(
"[advance_read_cursor] message bytes={:?} message={:?}",
message_slice,
message_str,
);
assert!(cursor.start + bytes <= cursor.end);
cursor.start += bytes;
}
@@ -583,3 +708,7 @@ enum StreamWriteBuffer {
},
Closed,
}
// pub struct TcpStream {
// socket: Rc<RefCell<WebSocket>>,
// }
+28 -12
View File
@@ -1,20 +1,20 @@
#![cfg(target_arch = "wasm32")]
use serde_json::value::RawValue;
use std::sync::Arc;
use subxt::config::PolkadotConfig;
use subxt::rpc::LightClient;
use wasm_bindgen_test::*;
use std::sync::Arc;
use serde_json::value::RawValue;
use subxt::rpc::RpcClientT;
use subxt::rpc::Subscription;
use wasm_bindgen_test::*;
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
/// Run the tests by `$ wasm-pack test --firefox --headless`
fn init_tracing() {
console_error_panic_hook::set_once();
tracing_wasm::set_as_global_default();
console_error_panic_hook::set_once();
tracing_wasm::set_as_global_default();
}
// #[wasm_bindgen_test]
@@ -29,7 +29,7 @@ fn init_tracing() {
#[wasm_bindgen_test]
async fn light_client_transport_works() {
init_tracing();
init_tracing();
tracing::warn!("Starting test");
let light_client = LightClient::new(include_str!("../../../artifacts/dev_spec.json")).unwrap();
@@ -42,25 +42,41 @@ async fn light_client_transport_works() {
// client does not sync in time with WASM_BINDGEN_TEST_TIMEOUT=1000 (around 16 seconds).
// Test raw RPC method calls.
let chain = light_client.request_raw("system_chain", None).await.unwrap();
let chain = light_client
.request_raw("system_chain", None)
.await
.unwrap();
let chain: String = serde_json::from_str(chain.get()).unwrap();
assert_eq!(&chain, "Development");
let param = RawValue::from_string("[0]".to_owned()).expect("Should be valid JSON");
let genesis = light_client.request_raw("chain_getBlockHash", Some(param)).await.unwrap();
let genesis = light_client
.request_raw("chain_getBlockHash", Some(param))
.await
.unwrap();
let genesis: String = serde_json::from_str(genesis.get()).unwrap();
assert!(genesis.starts_with("0x"));
tracing::warn!("Genesis hash {:?}", genesis);
// Ensure light-client functions with subscriptions.
let sub = light_client.subscribe_raw("chain_subscribeAllHeads", None, "chain_unsubscribeAllHeads").await.unwrap();
let sub = light_client
.subscribe_raw("chain_subscribeAllHeads", None, "chain_unsubscribeAllHeads")
.await
.unwrap();
// The subscription result is actually a PolkadotConfig::Header, we are interested in iteration.
let mut sub: Subscription<String> = Subscription::new(sub);
let block = sub.next().await.expect("Subscription failed").expect("Subscription ended");
let block = sub
.next()
.await
.expect("Subscription failed")
.expect("Subscription ended");
tracing::warn!("Block hash {:?}", block);
let block = sub.next().await.expect("Subscription failed").expect("Subscription ended");
let block = sub
.next()
.await
.expect("Subscription failed")
.expect("Subscription ended");
tracing::warn!("Block hash {:?}", block);
}