Compile Platform impl

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2023-06-05 15:52:05 +03:00
parent b70beaf7a4
commit 853a71ec24
9 changed files with 1015 additions and 494 deletions
+171 -77
View File
@@ -4,10 +4,15 @@ use futures_util::{future, FutureExt};
use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError};
use smoldot::libp2p::multiaddr::{Multiaddr, ProtocolRef};
use futures::SinkExt;
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
use futures_util::lock::Mutex;
use smoldot_light::platform::ConnectError;
use smoldot_light::platform::PlatformConnection;
use smoldot_light::platform::PlatformSubstreamDirection;
use std::sync::Arc;
use std::sync::Mutex;
use std::{
io::IoSlice,
net::{IpAddr, SocketAddr},
@@ -34,13 +39,25 @@ impl smoldot_light::platform::PlatformRef for Platform {
type Connection = std::convert::Infallible;
type Stream = ConnectionSocket;
type Stream = ConnectionStream;
type ConnectFuture = future::BoxFuture<'static, Result<ConnectionSocket, ConnectError>>;
type ConnectFuture = future::BoxFuture<
'static,
Result<PlatformConnection<Self::Stream, Self::Connection>, ConnectError>,
>;
// type ConnectFuture = future::BoxFuture<
// 'static,
// Result<
// smoldot_light::platform::PlatformConnection<Self::Stream, Self::Connection>,
// ConnectError,
// >,
// >;
type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
type NextSubstreamFuture<'a> = future::Pending<()>;
type NextSubstreamFuture<'a> =
future::Pending<Option<(Self::Stream, PlatformSubstreamDirection)>>;
fn now_from_unix_epoch(&self) -> std::time::Duration {
std::time::UNIX_EPOCH
@@ -82,7 +99,11 @@ impl smoldot_light::platform::PlatformRef for Platform {
}
fn connect(&self, url: &str) -> Self::ConnectFuture {
let url = url.to_string();
Box::pin(async move {
// let url = url.to_string();
let multiaddr = url.parse::<Multiaddr>().map_err(|_| ConnectError {
message: format!("Address {url} is not a valid multiaddress"),
is_bad_addr: true,
@@ -142,122 +163,195 @@ impl smoldot_light::platform::PlatformRef for Platform {
message: "Cannot stablish WebSocket connection".to_string(),
})?;
let (to_sender, from_sender) = mpsc::channel(1024);
let (to_receiver, from_receiver) = mpsc::channel(1024);
// let (to_sender, from_sender) = mpsc::channel(1024);
// let (to_receiver, from_receiver) = mpsc::channel(1024);
// TODO: Spawn a task:
// enun Protocol
/*
enum Protocol {
Send(String),
Recv(String),
}
bg_task {
while ..
Future:
A: if Send() .. self.send();
B: self.next() -> recv_end.send("Msg");
}
from fn send(&self, stream: &mut Self::Stream, data: &[u8]) {
let str: String = data.into();
sender.send(str);
}
from fn read_buffer<'a> {
}
*
*/
// Note: WebSocket is not `Send`, work around that with a spawned task.
wasm_bindgen_futures::spawn_local(async move {
let backend_event = tokio_stream::wrappers::ReceiverStream::new(from_sender);
let rpc_responses_event =
futures_util::stream::unfold(rpc_responses, |mut rpc_responses| async {
rpc_responses
.next()
.await
.map(|result| (result, rpc_responses))
});
// wasm_bindgen_futures::spawn_local(async move {
// let sender = tokio_stream::wrappers::ReceiverStream::new(from_sender);
// let receiver = tokio_stream::wrappers::ReceiverStream::new(from_receiver);
tokio::pin!(backend_event, rpc_responses_event);
// // let rpc_responses_event =
// // futures_util::stream::unfold(rpc_responses, |mut rpc_responses| async {
// // rpc_responses
// // .next()
// // .await
// // .map(|result| (result, rpc_responses))
// // });
let mut backend_event_fut = backend_event.next();
let mut rpc_responses_fut = rpc_responses_event.next();
// tokio::pin!(backend_event, rpc_responses_event);
loop {
match future::select(backend_event_fut, rpc_responses_fut).await {
// Message received from the backend: user registered.
Either::Left((backend_value, previous_fut)) => {
let Some(message) = backend_value else {
tracing::trace!(target: LOG_TARGET, "Frontend channel closed");
break;
};
tracing::trace!(
target: LOG_TARGET,
"Received register message {:?}",
message
);
// let mut backend_event_fut = backend_event.next();
// let mut rpc_responses_fut = rpc_responses_event.next();
self.handle_register(message).await;
// loop {
// match future::select(backend_event_fut, rpc_responses_fut).await {
// // Message received from the backend: user registered.
// Either::Left((backend_value, previous_fut)) => {
// let Some(message) = backend_value else {
// println!("Frontend channel closed");
// break;
// };
// tracing::trace!(
// target: LOG_TARGET,
// "Received register message {:?}",
// message
// );
backend_event_fut = backend_event.next();
rpc_responses_fut = previous_fut;
}
// Message received from rpc handler: lightclient response.
Either::Right((response, previous_fut)) => {
// Smoldot returns `None` if the chain has been removed (which subxt does not remove).
let Some(response) = response else {
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
break;
};
tracing::trace!(
target: LOG_TARGET,
"Received smoldot RPC result {:?}",
response
);
// self.handle_register(message).await;
self.handle_rpc_response(response).await;
// backend_event_fut = backend_event.next();
// rpc_responses_fut = previous_fut;
// }
// // Message received from rpc handler: lightclient response.
// Either::Right((response, previous_fut)) => {
// // Smoldot returns `None` if the chain has been removed (which subxt does not remove).
// let Some(response) = response else {
// println!("Smoldot RPC responses channel closed");
// break;
// };
// println!("Received smoldot RPC result {:?}", response);
// Advance backend, save frontend.
backend_event_fut = previous_fut;
rpc_responses_fut = rpc_responses_event.next();
}
}
}
});
// self.handle_rpc_response(response).await;
// // Advance backend, save frontend.
// backend_event_fut = previous_fut;
// rpc_responses_fut = rpc_responses_event.next();
// }
// }
// }
// });
let (sender, receiver) = websocket.split();
Ok(ConnectionSocket { sender, receiver })
let conn = ConnectionStream {
inner: Arc::new(Mutex::new(ConnectionInner { sender, receiver })),
};
Ok(PlatformConnection::SingleStreamMultistreamSelectNoiseYamux(
conn,
))
})
}
fn open_out_substream(&self, _connection: &mut Self::Connection) {
// Called from MultiStream connections that are not supported.
// Called from MultiStream connections that are never opened for this implementation.
}
fn next_substream<'a>(
&self,
connection: &'a mut Self::Connection,
) -> Self::NextSubstreamFuture<'a> {
// Called from MultiStream connections that are not supported.
// Called from MultiStream connections that are never opened for this implementation.
// futures::future::pending::<Option<(ConnectionStream, PlatformSubstreamDirection)>>()
futures::future::pending()
}
fn update_stream<'a>(&self, stream: &'a mut Self::Stream) -> Self::StreamUpdateFuture<'a> {
todo!()
Box::pin(async move {})
}
fn read_buffer<'a>(
&self,
stream: &'a mut Self::Stream,
) -> smoldot_light::platform::ReadBuffer<'a> {
todo!()
let mut locked = stream
.inner
.lock()
.expect("Mutex should not be poised; qed");
// let fut = locked.receiver.next();
// let msg = futures_executor::block_on(async {
let msg = futures::executor::block_on(async {
match locked.receiver.next().await {
Some(Ok(msg)) => Some(msg),
_ => None,
}
});
match msg {
Some(msg) => {
let msg = Box::leak(Box::new(msg));
match msg {
Message::Text(text) => {
smoldot_light::platform::ReadBuffer::Open(text.as_bytes())
}
Message::Bytes(bytes) => smoldot_light::platform::ReadBuffer::Open(bytes),
}
}
None => smoldot_light::platform::ReadBuffer::Closed,
}
}
fn advance_read_cursor(&self, stream: &mut Self::Stream, bytes: usize) {
todo!()
}
fn advance_read_cursor(&self, stream: &mut Self::Stream, bytes: usize) {}
fn writable_bytes(&self, stream: &mut Self::Stream) -> usize {
todo!()
1024
}
fn send(&self, stream: &mut Self::Stream, data: &[u8]) {
todo!()
let mut locked = stream
.inner
.lock()
.expect("Mutex should not be poised; qed");
if let Ok(message) = String::from_utf8(data.into()) {
let _ = locked.sender.send(Message::Text(message));
}
}
fn close_send(&self, stream: &mut Self::Stream) {
todo!()
}
fn close_send(&self, stream: &mut Self::Stream) {}
}
/// Error potentially returned by [`PlatformRef::connect`].
pub struct ConnectError {
/// Human-readable error message.
pub message: String,
/// `true` if the error is caused by the address to connect to being forbidden or unsupported.
pub is_bad_addr: bool,
// pub struct ConnectError {
// /// Human-readable error message.
// pub message: String,
// /// `true` if the error is caused by the address to connect to being forbidden or unsupported.
// pub is_bad_addr: bool,
// }
pub struct ConnectionInner {
sender: SplitSink<WebSocket, Message>,
receiver: SplitStream<WebSocket>,
}
pub struct ConnectionSocket {
sender: SplitSink<WebSocket, Message>,
receiver: SSplitStream<WebSocket>,
unsafe impl Send for ConnectionInner {}
pub struct ConnectionStream {
inner: Arc<Mutex<ConnectionInner>>,
}
unsafe impl Send for ConnectionStream {}