Add light client platform WASM compatible (#1026)

* Cargo update in prep for wasm build

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add light client test

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Implement low level socket

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add native platform primitives

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add wasm platform primitives

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Implement smoldot platform

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Adjust code to use custom platform

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Adjust feature flags

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* tests: Adjust wasm endpoint to accept ws for p2p

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Adjust wasm socket

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Book mention of wasm

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ci: Propagate env variable properly

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Revert to native feature flags

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* cli: Use tokio rt-multi-thread feature

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Add tokio feature flags for native platform

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm: Use polkadot live for wasm testing

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm: Add support for DNS p2p addresses

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm: Disable logs

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm: Run wasm test for firefox driver

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm: Reenable chrome driver

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Move lightclient RPC to dedicated crate for better feature flags and modularity

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Use subxt-lightclient low level RPC crate

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Apply cargo fmt

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Enable default:native feature for cargo check

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ci: Extra step for subxt-lightclient similar to signer crate

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Remove native platform code and use smoldot instead

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* codegen: Enable tokio/multi-threads

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Refactor modules

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Adjust testing crates

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ci: Run light-client WASM tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm-rpc: Remove light-client imports

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* testing: Update wasm cargo.lock files

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* ci: Spawn substrate node with deterministic p2p address for WASM tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm_socket: Use rc and refcell

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* wasm_socket: Switch back to Arc<Mutex<>>

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Add comments

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2023-07-18 14:27:16 +03:00
committed by GitHub
parent ab2f2a8cdf
commit 4bda673847
28 changed files with 5280 additions and 305 deletions
+437
View File
@@ -0,0 +1,437 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use futures::stream::StreamExt;
use futures_util::future::{self, Either};
use serde::Deserialize;
use serde_json::value::RawValue;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::{mpsc, oneshot};
use super::platform::PlatformType;
use super::LightClientRpcError;
use smoldot_light::ChainId;
const LOG_TARGET: &str = "light-client-background";
/// The response of an RPC method.
pub type MethodResponse = Result<Box<RawValue>, LightClientRpcError>;
/// Message protocol between the front-end client that submits the RPC requests
/// and the backend handler that produces responses from the chain.
///
/// The light client uses a single object [`smoldot_light::JsonRpcResponses`] to
/// handle all requests and subscriptions from a chain. A background task is spawned
/// to multiplex the rpc responses and to provide them back to their rightful submitters.
#[derive(Debug)]
pub enum FromSubxt {
/// The RPC method request.
Request {
/// The method of the request.
method: String,
/// The parameters of the request.
params: String,
/// Channel used to send back the result.
sender: oneshot::Sender<MethodResponse>,
},
/// The RPC subscription (pub/sub) request.
Subscription {
/// The method of the request.
method: String,
/// The parameters of the request.
params: String,
/// Channel used to send back the subscription ID if successful.
sub_id: oneshot::Sender<MethodResponse>,
/// Channel used to send back the notifcations.
sender: mpsc::UnboundedSender<Box<RawValue>>,
},
}
/// Background task data.
pub struct BackgroundTask {
/// Smoldot light client implementation that leverages the exposed platform.
client: smoldot_light::Client<PlatformType>,
/// The ID of the chain used to identify the chain protocol (ie. substrate).
///
/// Note: A single chain is supported for a client. This aligns with the subxt's
/// vision of the Client.
chain_id: ChainId,
/// Unique ID for RPC calls.
request_id: usize,
/// Map the request ID of a RPC method to the frontend `Sender`.
requests: HashMap<usize, oneshot::Sender<MethodResponse>>,
/// Subscription calls first need to make a plain RPC method
/// request to obtain the subscription ID.
///
/// The RPC method request is made in the background and the response should
/// not be sent back to the user.
/// Map the request ID of a RPC method to the frontend `Sender`.
id_to_subscription: HashMap<
usize,
(
oneshot::Sender<MethodResponse>,
mpsc::UnboundedSender<Box<RawValue>>,
),
>,
/// Map the subscription ID to the frontend `Sender`.
subscriptions: HashMap<usize, mpsc::UnboundedSender<Box<RawValue>>>,
}
impl BackgroundTask {
/// Constructs a new [`BackgroundTask`].
pub fn new(client: smoldot_light::Client<PlatformType>, chain_id: ChainId) -> BackgroundTask {
BackgroundTask {
client,
chain_id,
request_id: 1,
requests: Default::default(),
id_to_subscription: Default::default(),
subscriptions: Default::default(),
}
}
/// Fetch and increment the request ID.
fn next_id(&mut self) -> usize {
let next = self.request_id;
self.request_id = self.request_id.wrapping_add(1);
next
}
/// Handle the registration messages received from the user.
async fn handle_requests(&mut self, message: FromSubxt) {
match message {
FromSubxt::Request {
method,
params,
sender,
} => {
let id = self.next_id();
let request = format!(
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
id, method, params
);
self.requests.insert(id, sender);
let result = self.client.json_rpc_request(request, self.chain_id);
if let Err(err) = result {
tracing::warn!(
target: LOG_TARGET,
"Cannot send RPC request to lightclient {:?}",
err.to_string()
);
let sender = self
.requests
.remove(&id)
.expect("Channel is inserted above; qed");
// Send the error back to frontend.
if sender
.send(Err(LightClientRpcError::Request(err.to_string())))
.is_err()
{
tracing::warn!(
target: LOG_TARGET,
"Cannot send RPC request error to id={id}",
);
}
}
}
FromSubxt::Subscription {
method,
params,
sub_id,
sender,
} => {
// For subscriptions we need to make a plain RPC request to the subscription method.
// The server will return as a result the subscription ID.
let id = self.next_id();
let request = format!(
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
id, method, params
);
self.id_to_subscription.insert(id, (sub_id, sender));
let result = self.client.json_rpc_request(request, self.chain_id);
if let Err(err) = result {
tracing::warn!(
target: LOG_TARGET,
"Cannot send RPC request to lightclient {:?}",
err.to_string()
);
let (sub_id, _) = self
.id_to_subscription
.remove(&id)
.expect("Channels are inserted above; qed");
// Send the error back to frontend.
if sub_id
.send(Err(LightClientRpcError::Request(err.to_string())))
.is_err()
{
tracing::warn!(
target: LOG_TARGET,
"Cannot send RPC request error to id={id}",
);
}
}
}
};
}
/// Parse the response received from the light client and sent it to the appropriate user.
fn handle_rpc_response(&mut self, response: String) {
match RpcResponse::from_str(&response) {
Ok(RpcResponse::Error { id, error }) => {
let Ok(id) = id.parse::<usize>() else {
tracing::warn!(target: LOG_TARGET, "Cannot send error. Id={id} is not a valid number");
return
};
if let Some(sender) = self.requests.remove(&id) {
if sender
.send(Err(LightClientRpcError::Request(error.to_string())))
.is_err()
{
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id}",
);
}
} else if let Some((sub_id_sender, _)) = self.id_to_subscription.remove(&id) {
if sub_id_sender
.send(Err(LightClientRpcError::Request(error.to_string())))
.is_err()
{
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id {:?}",
id
);
}
}
}
Ok(RpcResponse::Method { id, result }) => {
let Ok(id) = id.parse::<usize>() else {
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} is not a valid number");
return
};
// Send the response back.
if let Some(sender) = self.requests.remove(&id) {
if sender.send(Ok(result)).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id}",
);
}
} else if let Some((sub_id_sender, sender)) = self.id_to_subscription.remove(&id) {
let Ok(sub_id) = result
.get()
.trim_start_matches('"')
.trim_end_matches('"')
.parse::<usize>() else {
tracing::warn!(
target: LOG_TARGET,
"Subscription id={result} is not a valid number",
);
return;
};
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id}");
if sub_id_sender.send(Ok(result)).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id}",
);
} else {
// Track this subscription ID if send is successful.
self.subscriptions.insert(sub_id, sender);
}
}
}
Ok(RpcResponse::Subscription { method, id, result }) => {
let Ok(id) = id.parse::<usize>() else {
tracing::warn!(target: LOG_TARGET, "Cannot send subscription. Id={id} is not a valid number");
return
};
if let Some(sender) = self.subscriptions.get_mut(&id) {
// Send the current notification response.
if sender.send(result).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send notification to subscription id={id} method={method}",
);
// Remove the sender if the subscription dropped the receiver.
self.subscriptions.remove(&id);
}
}
}
Err(err) => {
tracing::warn!(target: LOG_TARGET, "cannot decode RPC response {:?}", err);
}
}
}
/// Perform the main background task:
/// - receiving requests from subxt RPC method / subscriptions
/// - provides the results from the light client back to users.
pub async fn start_task(
&mut self,
from_subxt: mpsc::UnboundedReceiver<FromSubxt>,
from_node: smoldot_light::JsonRpcResponses,
) {
let from_subxt_event = tokio_stream::wrappers::UnboundedReceiverStream::new(from_subxt);
let from_node_event = futures_util::stream::unfold(from_node, |mut from_node| async {
from_node.next().await.map(|result| (result, from_node))
});
tokio::pin!(from_subxt_event, from_node_event);
let mut from_subxt_event_fut = from_subxt_event.next();
let mut from_node_event_fut = from_node_event.next();
loop {
match future::select(from_subxt_event_fut, from_node_event_fut).await {
// Message received from subxt.
Either::Left((subxt_message, previous_fut)) => {
let Some(message) = subxt_message else {
tracing::trace!(target: LOG_TARGET, "Subxt channel closed");
break;
};
tracing::trace!(
target: LOG_TARGET,
"Received register message {:?}",
message
);
self.handle_requests(message).await;
from_subxt_event_fut = from_subxt_event.next();
from_node_event_fut = previous_fut;
}
// Message received from rpc handler: lightclient response.
Either::Right((node_message, previous_fut)) => {
// Smoldot returns `None` if the chain has been removed (which subxt does not remove).
let Some(response) = node_message else {
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
break;
};
tracing::trace!(
target: LOG_TARGET,
"Received smoldot RPC result {:?}",
response
);
self.handle_rpc_response(response);
// Advance backend, save frontend.
from_subxt_event_fut = previous_fut;
from_node_event_fut = from_node_event.next();
}
}
}
tracing::trace!(target: LOG_TARGET, "Task closed");
}
}
/// The RPC response from the light-client.
/// This can either be a response of a method, or a notification from a subscription.
#[derive(Debug, Clone)]
enum RpcResponse {
Method {
/// Response ID.
id: String,
/// The result of the method call.
result: Box<RawValue>,
},
Subscription {
/// RPC method that generated the notification.
method: String,
/// Subscription ID.
id: String,
/// Result.
result: Box<RawValue>,
},
Error {
/// Response ID.
id: String,
/// Error.
error: Box<RawValue>,
},
}
impl std::str::FromStr for RpcResponse {
type Err = serde_json::Error;
fn from_str(response: &str) -> Result<Self, Self::Err> {
// Helper structures to deserialize from raw RPC strings.
#[derive(Deserialize, Debug)]
struct Response {
/// JSON-RPC version.
#[allow(unused)]
jsonrpc: String,
/// Result.
result: Box<RawValue>,
/// Request ID
id: String,
}
#[derive(Deserialize)]
struct NotificationParams {
/// The ID of the subscription.
subscription: String,
/// Result.
result: Box<RawValue>,
}
#[derive(Deserialize)]
struct ResponseNotification {
/// JSON-RPC version.
#[allow(unused)]
jsonrpc: String,
/// RPC method that generated the notification.
method: String,
/// Result.
params: NotificationParams,
}
#[derive(Deserialize)]
struct ErrorResponse {
/// JSON-RPC version.
#[allow(unused)]
jsonrpc: String,
/// Request ID.
id: String,
/// Error.
error: Box<RawValue>,
}
// Check if the response can be mapped as an RPC method response.
let result: Result<Response, _> = serde_json::from_str(response);
if let Ok(response) = result {
return Ok(RpcResponse::Method {
id: response.id,
result: response.result,
});
}
let result: Result<ResponseNotification, _> = serde_json::from_str(response);
if let Ok(notification) = result {
return Ok(RpcResponse::Subscription {
id: notification.params.subscription,
method: notification.method,
result: notification.params.result,
});
}
let error: ErrorResponse = serde_json::from_str(response)?;
Ok(RpcResponse::Error {
id: error.id,
error: error.error,
})
}
}
+114
View File
@@ -0,0 +1,114 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use super::{
background::{BackgroundTask, FromSubxt, MethodResponse},
LightClientRpcError,
};
use serde_json::value::RawValue;
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot};
use super::platform::build_platform;
pub const LOG_TARGET: &str = "light-client";
/// The light-client RPC implementation that is used to connect with the chain.
#[derive(Clone)]
pub struct LightClientRpc {
/// Communicate with the backend task that multiplexes the responses
/// back to the frontend.
to_backend: mpsc::UnboundedSender<FromSubxt>,
}
impl LightClientRpc {
/// Constructs a new [`LightClientRpc`], providing the chain specification.
///
/// The chain specification can be downloaded from a trusted network via
/// the `sync_state_genSyncSpec` RPC method. This parameter expects the
/// chain spec in text format (ie not in hex-encoded scale-encoded as RPC methods
/// will provide).
///
/// ## Panics
///
/// Panics if being called outside of `tokio` runtime context.
pub fn new(
config: smoldot_light::AddChainConfig<'_, (), impl Iterator<Item = smoldot_light::ChainId>>,
) -> Result<LightClientRpc, LightClientRpcError> {
tracing::trace!(target: LOG_TARGET, "Create light client");
let mut client = smoldot_light::Client::new(build_platform());
let smoldot_light::AddChainSuccess {
chain_id,
json_rpc_responses,
} = client
.add_chain(config)
.map_err(|err| LightClientRpcError::AddChainError(err.to_string()))?;
let (to_backend, backend) = mpsc::unbounded_channel();
// `json_rpc_responses` can only be `None` if we had passed `json_rpc: Disabled`.
let rpc_responses = json_rpc_responses.expect("Light client RPC configured; qed");
let future = async move {
let mut task = BackgroundTask::new(client, chain_id);
task.start_task(backend, rpc_responses).await;
};
#[cfg(feature = "native")]
tokio::spawn(future);
#[cfg(feature = "web")]
wasm_bindgen_futures::spawn_local(future);
Ok(LightClientRpc { to_backend })
}
/// Submits an RPC method request to the light-client.
///
/// This method sends a request to the light-client to execute an RPC method with the provided parameters.
/// The parameters are parsed into a valid JSON object in the background.
pub fn method_request(
&self,
method: String,
params: String,
) -> Result<oneshot::Receiver<MethodResponse>, SendError<FromSubxt>> {
let (sender, receiver) = oneshot::channel();
self.to_backend.send(FromSubxt::Request {
method,
params,
sender,
})?;
Ok(receiver)
}
/// Makes an RPC subscription call to the light-client.
///
/// This method sends a request to the light-client to establish an RPC subscription with the provided parameters.
/// The parameters are parsed into a valid JSON object in the background.
pub fn subscription_request(
&self,
method: String,
params: String,
) -> Result<
(
oneshot::Receiver<MethodResponse>,
mpsc::UnboundedReceiver<Box<RawValue>>,
),
SendError<FromSubxt>,
> {
let (sub_id, sub_id_rx) = oneshot::channel();
let (sender, receiver) = mpsc::unbounded_channel();
self.to_backend.send(FromSubxt::Subscription {
method,
params,
sub_id,
sender,
})?;
Ok((sub_id_rx, receiver))
}
}
+48
View File
@@ -0,0 +1,48 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Low level light client implementation for RPC method and
//! subscriptions requests.
//!
//! The client implementation supports both native and wasm
//! environments.
//!
//! This leverages the smoldot crate to connect to the chain.
#![deny(
missing_docs,
unused_crate_dependencies,
unused_extern_crates,
clippy::all
)]
#![allow(clippy::type_complexity)]
#[cfg(any(
all(feature = "web", feature = "native"),
not(any(feature = "web", feature = "native"))
))]
compile_error!("subxt: exactly one of the 'web' and 'native' features should be used.");
mod background;
mod client;
mod platform;
// Used to enable the js feature for wasm.
#[cfg(feature = "web")]
#[allow(unused_imports)]
pub use getrandom as _;
pub use client::LightClientRpc;
pub use smoldot_light::{AddChainConfig, AddChainConfigJsonRpc, ChainId};
/// Light client error.
#[derive(Debug, thiserror::Error)]
pub enum LightClientRpcError {
/// Error encountered while adding the chain to the light-client.
#[error("Failed to add the chain to the light client: {0}.")]
AddChainError(String),
/// Error originated while trying to submit a RPC request.
#[error("RPC request cannot be sent: {0}.")]
Request(String),
}
+40
View File
@@ -0,0 +1,40 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Default platform for WASM environments.
#[cfg(feature = "web")]
mod wasm_helpers;
#[cfg(feature = "web")]
mod wasm_platform;
#[cfg(feature = "web")]
mod wasm_socket;
pub use helpers::{build_platform, PlatformType};
#[cfg(feature = "native")]
mod helpers {
use smoldot_light::platform::default::DefaultPlatform as Platform;
use std::sync::Arc;
pub type PlatformType = Arc<Platform>;
pub fn build_platform() -> PlatformType {
Platform::new(
"subxt-light-client".into(),
env!("CARGO_PKG_VERSION").into(),
)
}
}
#[cfg(feature = "web")]
mod helpers {
use super::wasm_platform::SubxtPlatform as Platform;
pub type PlatformType = Platform;
pub fn build_platform() -> PlatformType {
Platform::new()
}
}
+126
View File
@@ -0,0 +1,126 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Wasm implementation for the light client's platform using
//! custom websockets.
use core::time::Duration;
use futures_util::{future, FutureExt};
use smoldot::libp2p::multiaddr::ProtocolRef;
use smoldot_light::platform::{ConnectError, PlatformConnection};
use std::{
collections::VecDeque,
net::{IpAddr, SocketAddr},
};
use super::wasm_socket::WasmSocket;
pub fn spawn(task: future::BoxFuture<'static, ()>) {
wasm_bindgen_futures::spawn_local(task);
}
pub fn now_from_unix_epoch() -> Duration {
instant::SystemTime::now()
.duration_since(instant::SystemTime::UNIX_EPOCH)
.unwrap_or_else(|_| {
panic!("Invalid systime cannot be configured earlier than `UNIX_EPOCH`")
})
}
pub type Instant = instant::Instant;
pub fn now() -> Instant {
instant::Instant::now()
}
pub type Delay = future::BoxFuture<'static, ()>;
pub fn sleep(duration: Duration) -> Delay {
futures_timer::Delay::new(duration).boxed()
}
pub struct Stream {
pub socket: WasmSocket,
/// Read and write buffers of the connection, or `None` if the socket has been reset.
pub buffers: Option<(StreamReadBuffer, StreamWriteBuffer)>,
}
pub enum StreamReadBuffer {
Open {
buffer: Vec<u8>,
cursor: std::ops::Range<usize>,
},
Closed,
}
pub enum StreamWriteBuffer {
Open {
buffer: VecDeque<u8>,
must_flush: bool,
must_close: bool,
},
Closed,
}
pub async fn connect<'a>(
proto1: ProtocolRef<'a>,
proto2: ProtocolRef<'a>,
proto3: Option<ProtocolRef<'a>>,
) -> Result<PlatformConnection<Stream, std::convert::Infallible>, ConnectError> {
// Ensure ahead of time that the multiaddress is supported.
let addr = match (&proto1, &proto2, &proto3) {
(ProtocolRef::Ip4(ip), ProtocolRef::Tcp(port), Some(ProtocolRef::Ws)) => {
let addr = SocketAddr::new(IpAddr::V4((*ip).into()), *port);
format!("ws://{}", addr.to_string())
}
(ProtocolRef::Ip6(ip), ProtocolRef::Tcp(port), Some(ProtocolRef::Ws)) => {
let addr = SocketAddr::new(IpAddr::V6((*ip).into()), *port);
format!("ws://{}", addr.to_string())
}
(
ProtocolRef::Dns(addr) | ProtocolRef::Dns4(addr) | ProtocolRef::Dns6(addr),
ProtocolRef::Tcp(port),
Some(ProtocolRef::Ws),
) => {
format!("ws://{}:{}", addr.to_string(), port)
}
(
ProtocolRef::Dns(addr) | ProtocolRef::Dns4(addr) | ProtocolRef::Dns6(addr),
ProtocolRef::Tcp(port),
Some(ProtocolRef::Wss),
) => {
format!("wss://{}:{}", addr.to_string(), port)
}
_ => {
return Err(ConnectError {
is_bad_addr: true,
message: "Unknown protocols combination".to_string(),
})
}
};
tracing::debug!("Connecting to addr={addr}");
let socket = WasmSocket::new(addr.as_str()).map_err(|err| ConnectError {
is_bad_addr: false,
message: format!("Failed to reach peer: {err}"),
})?;
Ok(PlatformConnection::SingleStreamMultistreamSelectNoiseYamux(
Stream {
socket,
buffers: Some((
StreamReadBuffer::Open {
buffer: vec![0; 16384],
cursor: 0..0,
},
StreamWriteBuffer::Open {
buffer: VecDeque::with_capacity(16384),
must_close: false,
must_flush: false,
},
)),
},
))
}
+302
View File
@@ -0,0 +1,302 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use core::time::Duration;
use futures::{prelude::*, task::Poll};
use smoldot::libp2p::multiaddr::Multiaddr;
use smoldot_light::platform::{
ConnectError, PlatformConnection, PlatformRef, PlatformSubstreamDirection, ReadBuffer,
};
use std::{io::IoSlice, pin::Pin};
use super::wasm_helpers::{StreamReadBuffer, StreamWriteBuffer};
/// Subxt plaform implementation for wasm.
///
/// This implementation is a conversion of the implementation from the smoldot:
/// https://github.com/smol-dot/smoldot/blob/f49ce4ea6a325c444ab6ad37d3ab5558edf0d541/light-base/src/platform/default.rs#L52.
///
/// This platform will evolve over time and we'll need to keep this code in sync.
#[derive(Clone)]
pub struct SubxtPlatform {}
impl SubxtPlatform {
pub fn new() -> Self {
SubxtPlatform {}
}
}
impl PlatformRef for SubxtPlatform {
type Delay = super::wasm_helpers::Delay;
type Yield = future::Ready<()>;
type Instant = super::wasm_helpers::Instant;
type Connection = std::convert::Infallible;
type Stream = super::wasm_helpers::Stream;
type ConnectFuture = future::BoxFuture<
'static,
Result<PlatformConnection<Self::Stream, Self::Connection>, ConnectError>,
>;
type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
type NextSubstreamFuture<'a> =
future::Pending<Option<(Self::Stream, PlatformSubstreamDirection)>>;
fn now_from_unix_epoch(&self) -> Duration {
super::wasm_helpers::now_from_unix_epoch()
}
fn now(&self) -> Self::Instant {
super::wasm_helpers::now()
}
fn sleep(&self, duration: Duration) -> Self::Delay {
super::wasm_helpers::sleep(duration)
}
fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
self.sleep(when.saturating_duration_since(self.now()))
}
fn yield_after_cpu_intensive(&self) -> Self::Yield {
// No-op.
future::ready(())
}
fn connect(&self, multiaddr: &str) -> Self::ConnectFuture {
// We simply copy the address to own it. We could be more zero-cost here, but doing so
// would considerably complicate the implementation.
let multiaddr = multiaddr.to_owned();
tracing::debug!("Connecting to multiaddress={:?}", multiaddr);
Box::pin(async move {
let addr = multiaddr.parse::<Multiaddr>().map_err(|_| ConnectError {
is_bad_addr: true,
message: "Failed to parse address".to_string(),
})?;
let mut iter = addr.iter().fuse();
let proto1 = iter.next().ok_or(ConnectError {
is_bad_addr: true,
message: "Unknown protocols combination".to_string(),
})?;
let proto2 = iter.next().ok_or(ConnectError {
is_bad_addr: true,
message: "Unknown protocols combination".to_string(),
})?;
let proto3 = iter.next();
if iter.next().is_some() {
return Err(ConnectError {
is_bad_addr: true,
message: "Unknown protocols combination".to_string(),
});
}
super::wasm_helpers::connect(proto1, proto2, proto3).await
})
}
fn open_out_substream(&self, c: &mut Self::Connection) {
// This function can only be called with so-called "multi-stream" connections. We never
// open such connection.
match *c {}
}
fn next_substream<'a>(&self, c: &'a mut Self::Connection) -> Self::NextSubstreamFuture<'a> {
// This function can only be called with so-called "multi-stream" connections. We never
// open such connection.
match *c {}
}
fn update_stream<'a>(&self, stream: &'a mut Self::Stream) -> Self::StreamUpdateFuture<'a> {
Box::pin(future::poll_fn(|cx| {
// The `connect` is expected to be called before this method and would populate
// the buffers properly. When the buffers are empty, this future is shortly dropped.
let Some((read_buffer, write_buffer)) = stream.buffers.as_mut() else { return Poll::Pending };
// Whether the future returned by `update_stream` should return `Ready` or `Pending`.
let mut update_stream_future_ready = false;
if let StreamReadBuffer::Open {
buffer: ref mut buf,
ref mut cursor,
} = read_buffer
{
// When reading data from the socket, `poll_read` might return "EOF". In that
// situation, we transition to the `Closed` state, which would discard the data
// currently in the buffer. For this reason, we only try to read if there is no
// data left in the buffer.
if cursor.start == cursor.end {
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_read(cx, buf) {
update_stream_future_ready = true;
match result {
Err(_) => {
// End the stream.
stream.buffers = None;
return Poll::Ready(());
}
Ok(0) => {
// EOF.
*read_buffer = StreamReadBuffer::Closed;
}
Ok(bytes) => {
*cursor = 0..bytes;
}
}
}
}
}
if let StreamWriteBuffer::Open {
buffer: ref mut buf,
must_flush,
must_close,
} = write_buffer
{
while !buf.is_empty() {
let write_queue_slices = buf.as_slices();
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_write_vectored(
cx,
&[
IoSlice::new(write_queue_slices.0),
IoSlice::new(write_queue_slices.1),
],
) {
if !*must_close {
// In the situation where the API user wants to close the writing
// side, simply sending the buffered data isn't enough to justify
// making the future ready.
update_stream_future_ready = true;
}
match result {
Err(_) => {
// End the stream.
stream.buffers = None;
return Poll::Ready(());
}
Ok(bytes) => {
*must_flush = true;
for _ in 0..bytes {
buf.pop_front();
}
}
}
} else {
break;
}
}
if buf.is_empty() && *must_close {
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_close(cx) {
update_stream_future_ready = true;
match result {
Err(_) => {
// End the stream.
stream.buffers = None;
return Poll::Ready(());
}
Ok(()) => {
*write_buffer = StreamWriteBuffer::Closed;
}
}
}
} else if *must_flush {
if let Poll::Ready(result) = Pin::new(&mut stream.socket).poll_flush(cx) {
update_stream_future_ready = true;
match result {
Err(_) => {
// End the stream.
stream.buffers = None;
return Poll::Ready(());
}
Ok(()) => {
*must_flush = false;
}
}
}
}
}
if update_stream_future_ready {
Poll::Ready(())
} else {
// Progress cannot be made since poll_read, poll_write, poll_close, poll_flush
// are not ready yet. Smoldot drops this future and calls it again with the
// next processing iteration.
Poll::Pending
}
}))
}
fn read_buffer<'a>(&self, stream: &'a mut Self::Stream) -> ReadBuffer<'a> {
match stream.buffers.as_ref().map(|(r, _)| r) {
None => ReadBuffer::Reset,
Some(StreamReadBuffer::Closed) => ReadBuffer::Closed,
Some(StreamReadBuffer::Open { buffer, cursor }) => {
ReadBuffer::Open(&buffer[cursor.clone()])
}
}
}
fn advance_read_cursor(&self, stream: &mut Self::Stream, extra_bytes: usize) {
let Some(StreamReadBuffer::Open { ref mut cursor, .. }) =
stream.buffers.as_mut().map(|(r, _)| r)
else {
assert_eq!(extra_bytes, 0);
return
};
assert!(cursor.start + extra_bytes <= cursor.end);
cursor.start += extra_bytes;
}
fn writable_bytes(&self, stream: &mut Self::Stream) -> usize {
let Some(StreamWriteBuffer::Open { ref mut buffer, must_close: false, ..}) =
stream.buffers.as_mut().map(|(_, w)| w) else { return 0 };
buffer.capacity() - buffer.len()
}
fn send(&self, stream: &mut Self::Stream, data: &[u8]) {
debug_assert!(!data.is_empty());
// Because `writable_bytes` returns 0 if the writing side is closed, and because `data`
// must always have a size inferior or equal to `writable_bytes`, we know for sure that
// the writing side isn't closed.
let Some(StreamWriteBuffer::Open { ref mut buffer, .. } )=
stream.buffers.as_mut().map(|(_, w)| w) else { panic!() };
buffer.reserve(data.len());
buffer.extend(data.iter().copied());
}
fn close_send(&self, stream: &mut Self::Stream) {
// It is not illegal to call this on an already-reset stream.
let Some((_, write_buffer)) = stream.buffers.as_mut() else { return };
match write_buffer {
StreamWriteBuffer::Open {
must_close: must_close @ false,
..
} => *must_close = true,
_ => {
// However, it is illegal to call this on a stream that was already close
// attempted.
panic!()
}
}
}
fn spawn_task(&self, _: std::borrow::Cow<str>, task: future::BoxFuture<'static, ()>) {
super::wasm_helpers::spawn(task);
}
fn client_name(&self) -> std::borrow::Cow<str> {
"subxt-light-client".into()
}
fn client_version(&self) -> std::borrow::Cow<str> {
env!("CARGO_PKG_VERSION").into()
}
}
+237
View File
@@ -0,0 +1,237 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use futures::{io, prelude::*};
use send_wrapper::SendWrapper;
use wasm_bindgen::{prelude::*, JsCast};
use std::{
collections::VecDeque,
pin::Pin,
sync::{Arc, Mutex},
task::Poll,
task::{Context, Waker},
};
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Failed to connect {0}")]
ConnectionError(String),
}
/// Websocket for WASM environments.
///
/// This is a rust-based wrapper around browser's WebSocket API.
///
// Warning: It is not safe to have `Clone` on this structure.
pub struct WasmSocket {
/// Inner data shared between `poll` and web_sys callbacks.
inner: Arc<Mutex<InnerWasmSocket>>,
/// This implements `Send` and panics if the value is accessed
/// or dropped from another thread.
///
/// This is safe in wasm environments.
socket: SendWrapper<web_sys::WebSocket>,
/// In memory callbacks to handle messages from the browser socket.
_callbacks: SendWrapper<Callbacks>,
}
/// The state of the [`WasmSocket`].
#[derive(PartialEq, Eq, Clone, Copy)]
enum ConnectionState {
/// Initial state of the socket.
Connecting,
/// Socket is fully opened.
Opened,
/// Socket is closed.
Closed,
/// Error reported by callbacks.
Error,
}
struct InnerWasmSocket {
/// The state of the connection.
state: ConnectionState,
/// Data buffer for the socket.
data: VecDeque<u8>,
/// Waker from `poll_read` / `poll_write`.
waker: Option<Waker>,
}
/// Registered callbacks of the [`WasmSocket`].
///
/// These need to be kept around until the socket is dropped.
type Callbacks = (
Closure<dyn FnMut()>,
Closure<dyn FnMut(web_sys::MessageEvent)>,
Closure<dyn FnMut(web_sys::Event)>,
Closure<dyn FnMut(web_sys::CloseEvent)>,
);
impl WasmSocket {
/// Establish a WebSocket connection.
///
/// The error is a string representing the browser error.
/// Visit [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#exceptions_thrown)
/// for more info.
pub fn new(addr: &str) -> Result<Self, Error> {
let socket = match web_sys::WebSocket::new(addr) {
Ok(socket) => socket,
Err(err) => return Err(Error::ConnectionError(format!("{:?}", err))),
};
socket.set_binary_type(web_sys::BinaryType::Arraybuffer);
let inner = Arc::new(Mutex::new(InnerWasmSocket {
state: ConnectionState::Connecting,
data: VecDeque::with_capacity(16384),
waker: None,
}));
let open_callback = Closure::<dyn FnMut()>::new({
let inner = inner.clone();
move || {
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Opened;
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onopen(Some(open_callback.as_ref().unchecked_ref()));
let message_callback = Closure::<dyn FnMut(_)>::new({
let inner = inner.clone();
move |event: web_sys::MessageEvent| {
let Ok(buffer) = event.data().dyn_into::<js_sys::ArrayBuffer>() else {
panic!("Unexpected data format {:?}", event.data());
};
let mut inner = inner.lock().expect("Mutex is poised; qed");
let bytes = js_sys::Uint8Array::new(&buffer).to_vec();
inner.data.extend(bytes.into_iter());
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onmessage(Some(message_callback.as_ref().unchecked_ref()));
let error_callback = Closure::<dyn FnMut(_)>::new({
let inner = inner.clone();
move |_| {
// Callback does not provide useful information, signal it back to the stream.
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Error;
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
let close_callback = Closure::<dyn FnMut(_)>::new({
let inner = inner.clone();
move |_| {
let mut inner = inner.lock().expect("Mutex is poised; qed");
inner.state = ConnectionState::Closed;
if let Some(waker) = inner.waker.take() {
waker.wake();
}
}
});
socket.set_onclose(Some(close_callback.as_ref().unchecked_ref()));
let callbacks = (
open_callback,
message_callback,
error_callback,
close_callback,
);
Ok(Self {
inner,
socket: SendWrapper::new(socket),
_callbacks: SendWrapper::new(callbacks),
})
}
}
impl AsyncRead for WasmSocket {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize, io::Error>> {
let mut inner = self.inner.lock().expect("Mutex is poised; qed");
inner.waker = Some(cx.waker().clone());
match inner.state {
ConnectionState::Error => {
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error")))
}
ConnectionState::Closed => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
ConnectionState::Connecting => Poll::Pending,
ConnectionState::Opened => {
if inner.data.is_empty() {
return Poll::Pending;
}
let n = inner.data.len().min(buf.len());
for k in buf.iter_mut().take(n) {
*k = inner.data.pop_front().expect("Buffer non empty; qed");
}
Poll::Ready(Ok(n))
}
}
}
}
impl AsyncWrite for WasmSocket {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
let mut inner = self.inner.lock().expect("Mutex is poised; qed");
inner.waker = Some(cx.waker().clone());
match inner.state {
ConnectionState::Error => {
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "Socket error")))
}
ConnectionState::Closed => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
ConnectionState::Connecting => Poll::Pending,
ConnectionState::Opened => match self.socket.send_with_u8_array(buf) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(err) => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
format!("Write error: {err:?}"),
))),
},
}
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
impl Drop for WasmSocket {
fn drop(&mut self) {
let inner = self.inner.lock().expect("Mutex is poised; qed");
if inner.state == ConnectionState::Opened {
let _ = self.socket.close();
}
}
}