fix: Resolve cargo clippy errors and add CI workflow plan
## Changes
### Clippy Fixes
- Fixed deprecated `cargo_bin` usage in 27 test files (added #![allow(deprecated)])
- Fixed uninlined_format_args in zombienet-sdk-tests
- Fixed subxt API changes in revive/rpc/tests.rs (fetch signature, StorageValue)
- Fixed dead_code warnings in validator-pool and identity-kyc mocks
- Fixed field name `i` -> `_i` in tasks example
### CI Infrastructure
- Added .claude/WORKFLOW_PLAN.md for tracking CI fix progress
- Updated lychee.toml and taplo.toml configs
### Files Modified
- 27 test files with deprecated cargo_bin fix
- bizinikiwi/pezframe/revive/rpc/src/tests.rs (subxt API)
- pezkuwi/pezpallets/validator-pool/src/{mock,tests}.rs
- pezcumulus/teyrchains/pezpallets/identity-kyc/src/mock.rs
- bizinikiwi/pezframe/examples/tasks/src/tests.rs
## Status
- cargo clippy: PASSING
- Next: cargo fmt, zepter, workspace checks
This commit is contained in:
-77
@@ -1,77 +0,0 @@
|
||||
[package]
|
||||
name = "pezkuwi-subxt-lightclient"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
publish = true
|
||||
|
||||
license.workspace = true
|
||||
readme = "../README.md"
|
||||
repository.workspace = true
|
||||
documentation.workspace = true
|
||||
homepage.workspace = true
|
||||
description = "Light Client for chain interaction"
|
||||
keywords = ["parity", "substrate", "blockchain"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[features]
|
||||
default = ["native"]
|
||||
|
||||
# Enable this for native (ie non web/wasm builds).
|
||||
# Exactly 1 of "web" and "native" is expected.
|
||||
native = [
|
||||
"smoldot-light/std",
|
||||
"tokio/rt",
|
||||
]
|
||||
|
||||
# Enable this for web/wasm builds.
|
||||
# Exactly 1 of "web" and "native" is expected.
|
||||
web = [
|
||||
"getrandom/js",
|
||||
"smoldot/std",
|
||||
|
||||
# For the light-client platform.
|
||||
"wasm-bindgen-futures",
|
||||
"futures-timer/wasm-bindgen",
|
||||
"web-time",
|
||||
"pin-project",
|
||||
|
||||
# For websocket.
|
||||
"js-sys",
|
||||
"send_wrapper",
|
||||
"web-sys",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
futures = { workspace = true, features = ["async-await"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true, features = ["default", "raw_value"] }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
smoldot-light = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
futures-util = { workspace = true }
|
||||
|
||||
# Only needed for web
|
||||
js-sys = { workspace = true, optional = true }
|
||||
send_wrapper = { workspace = true, optional = true }
|
||||
web-sys = { workspace = true, optional = true }
|
||||
wasm-bindgen = { workspace = true, optional = true }
|
||||
wasm-bindgen-futures = { workspace = true, optional = true }
|
||||
smoldot = { workspace = true, optional = true }
|
||||
pin-project = { workspace = true, optional = true }
|
||||
futures-timer = { workspace = true, optional = true }
|
||||
web-time = { workspace = true, optional = true }
|
||||
getrandom = { workspace = true, optional = true }
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
default-features = true
|
||||
rustdoc-args = ["--cfg", "docsrs"]
|
||||
|
||||
[package.metadata.playground]
|
||||
default-features = true
|
||||
-530
@@ -1,530 +0,0 @@
|
||||
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use crate::rpc::RpcResponse;
|
||||
use crate::shared_client::SharedClient;
|
||||
use crate::{JsonRpcError, LightClientRpcError};
|
||||
use futures::{FutureExt, stream::StreamExt};
|
||||
use serde_json::value::RawValue;
|
||||
use smoldot_light::platform::PlatformRef;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
const LOG_TARGET: &str = "subxt-light-client-background-task";
|
||||
|
||||
/// Response from [`BackgroundTaskHandle::request()`].
|
||||
pub type MethodResponse = Result<Box<RawValue>, LightClientRpcError>;
|
||||
|
||||
/// Response from [`BackgroundTaskHandle::subscribe()`].
|
||||
pub type SubscriptionResponse = Result<
|
||||
(
|
||||
SubscriptionId,
|
||||
mpsc::UnboundedReceiver<Result<Box<RawValue>, JsonRpcError>>,
|
||||
),
|
||||
LightClientRpcError,
|
||||
>;
|
||||
|
||||
/// Type of subscription IDs we can get back.
|
||||
pub type SubscriptionId = String;
|
||||
|
||||
/// Message protocol between the front-end client that submits the RPC requests
|
||||
/// and the background task which fetches responses from Smoldot. Hidden behind
|
||||
/// the [`BackgroundTaskHandle`].
|
||||
#[derive(Debug)]
|
||||
enum Message {
|
||||
/// The RPC method request.
|
||||
Request {
|
||||
/// The method of the request.
|
||||
method: String,
|
||||
/// The parameters of the request.
|
||||
params: Option<Box<RawValue>>,
|
||||
/// Channel used to send back the method response.
|
||||
sender: oneshot::Sender<MethodResponse>,
|
||||
},
|
||||
/// The RPC subscription (pub/sub) request.
|
||||
Subscription {
|
||||
/// The method of the request.
|
||||
method: String,
|
||||
/// The method to unsubscribe.
|
||||
unsubscribe_method: String,
|
||||
/// The parameters of the request.
|
||||
params: Option<Box<RawValue>>,
|
||||
/// Channel used to send back the subscription response.
|
||||
sender: oneshot::Sender<SubscriptionResponse>,
|
||||
},
|
||||
}
|
||||
|
||||
/// A handle to communicate with the background task.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BackgroundTaskHandle {
|
||||
to_backend: mpsc::UnboundedSender<Message>,
|
||||
}
|
||||
|
||||
impl BackgroundTaskHandle {
|
||||
/// Make an RPC request via the background task.
|
||||
pub async fn request(&self, method: String, params: Option<Box<RawValue>>) -> MethodResponse {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_backend
|
||||
.send(Message::Request {
|
||||
method,
|
||||
params,
|
||||
sender: tx,
|
||||
})
|
||||
.map_err(|_e| LightClientRpcError::BackgroundTaskDropped)?;
|
||||
|
||||
match rx.await {
|
||||
Err(_e) => Err(LightClientRpcError::BackgroundTaskDropped),
|
||||
Ok(response) => response,
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribe to some RPC method via the background task.
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
unsubscribe_method: String,
|
||||
) -> SubscriptionResponse {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_backend
|
||||
.send(Message::Subscription {
|
||||
method,
|
||||
params,
|
||||
unsubscribe_method,
|
||||
sender: tx,
|
||||
})
|
||||
.map_err(|_e| LightClientRpcError::BackgroundTaskDropped)?;
|
||||
|
||||
match rx.await {
|
||||
Err(_e) => Err(LightClientRpcError::BackgroundTaskDropped),
|
||||
Ok(response) => response,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A background task which runs with [`BackgroundTask::run()`] and manages messages
|
||||
/// coming to/from Smoldot.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
|
||||
channels: BackgroundTaskChannels<TPlatform>,
|
||||
data: BackgroundTaskData<TPlatform, TChain>,
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
/// Constructs a new [`BackgroundTask`].
|
||||
pub(crate) fn new(
|
||||
client: SharedClient<TPlatform, TChain>,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
from_back: smoldot_light::JsonRpcResponses<TPlatform>,
|
||||
) -> (BackgroundTask<TPlatform, TChain>, BackgroundTaskHandle) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let bg_task = BackgroundTask {
|
||||
channels: BackgroundTaskChannels {
|
||||
from_front: UnboundedReceiverStream::new(rx),
|
||||
from_back,
|
||||
},
|
||||
data: BackgroundTaskData {
|
||||
client,
|
||||
chain_id,
|
||||
last_request_id: 0,
|
||||
pending_subscriptions: HashMap::new(),
|
||||
requests: HashMap::new(),
|
||||
subscriptions: HashMap::new(),
|
||||
},
|
||||
};
|
||||
|
||||
let bg_handle = BackgroundTaskHandle { to_backend: tx };
|
||||
|
||||
(bg_task, bg_handle)
|
||||
}
|
||||
|
||||
/// Run the background task, which:
|
||||
/// - Forwards messages/subscription requests to Smoldot from the front end.
|
||||
/// - Forwards responses back from Smoldot to the front end.
|
||||
pub async fn run(self) {
|
||||
let chain_id = self.data.chain_id;
|
||||
let mut channels = self.channels;
|
||||
let mut data = self.data;
|
||||
|
||||
loop {
|
||||
tokio::pin! {
|
||||
let from_front_fut = channels.from_front.next().fuse();
|
||||
let from_back_fut = channels.from_back.next().fuse();
|
||||
}
|
||||
|
||||
futures::select! {
|
||||
// Message coming from the front end/client.
|
||||
front_message = from_front_fut => {
|
||||
let Some(message) = front_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Subxt channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received register message {:?}",
|
||||
message
|
||||
);
|
||||
|
||||
data.handle_requests(message).await;
|
||||
},
|
||||
// Message coming from Smoldot.
|
||||
back_message = from_back_fut => {
|
||||
let Some(back_message) = back_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
|
||||
break;
|
||||
};
|
||||
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received smoldot RPC chain {chain_id:?} result {}",
|
||||
trim_message(&back_message),
|
||||
);
|
||||
|
||||
data.handle_rpc_response(back_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Task closed");
|
||||
}
|
||||
}
|
||||
|
||||
struct BackgroundTaskChannels<TPlatform: PlatformRef> {
|
||||
/// Messages sent into this background task from the front end.
|
||||
from_front: UnboundedReceiverStream<Message>,
|
||||
/// Messages sent into the background task from Smoldot.
|
||||
from_back: smoldot_light::JsonRpcResponses<TPlatform>,
|
||||
}
|
||||
|
||||
struct BackgroundTaskData<TPlatform: PlatformRef, TChain> {
|
||||
/// A smoldot light client that can be shared.
|
||||
client: SharedClient<TPlatform, TChain>,
|
||||
/// Knowing the chain ID helps with debugging, but isn't otherwise necessary.
|
||||
chain_id: smoldot_light::ChainId,
|
||||
/// Know which Id to use next for new requests/subscriptions.
|
||||
last_request_id: usize,
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
requests: HashMap<usize, oneshot::Sender<MethodResponse>>,
|
||||
/// Subscription calls first need to make a plain RPC method
|
||||
/// request to obtain the subscription ID.
|
||||
///
|
||||
/// The RPC method request is made in the background and the response should
|
||||
/// not be sent back to the user.
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
pending_subscriptions: HashMap<usize, PendingSubscription>,
|
||||
/// Map the subscription ID to the frontend `Sender`.
|
||||
///
|
||||
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
|
||||
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
|
||||
subscriptions: HashMap<String, ActiveSubscription>,
|
||||
}
|
||||
|
||||
/// The state needed to resolve the subscription ID and send
|
||||
/// back the response to frontend.
|
||||
struct PendingSubscription {
|
||||
/// Send the method response ID back to the user.
|
||||
///
|
||||
/// It contains the subscription ID if successful, or an JSON RPC error object.
|
||||
response_sender: oneshot::Sender<SubscriptionResponse>,
|
||||
/// The unsubscribe method to call when the user drops the receiver
|
||||
/// part of the channel.
|
||||
unsubscribe_method: String,
|
||||
}
|
||||
|
||||
/// The state of the subscription.
|
||||
struct ActiveSubscription {
|
||||
/// Channel to send the subscription notifications back to frontend.
|
||||
notification_sender: mpsc::UnboundedSender<Result<Box<RawValue>, JsonRpcError>>,
|
||||
/// The unsubscribe method to call when the user drops the receiver
|
||||
/// part of the channel.
|
||||
unsubscribe_method: String,
|
||||
}
|
||||
|
||||
fn trim_message(s: &str) -> &str {
|
||||
const MAX_SIZE: usize = 512;
|
||||
if s.len() < MAX_SIZE {
|
||||
return s;
|
||||
}
|
||||
|
||||
match s.char_indices().nth(MAX_SIZE) {
|
||||
None => s,
|
||||
Some((idx, _)) => &s[..idx],
|
||||
}
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTaskData<TPlatform, TChain> {
|
||||
/// Fetch and increment the request ID.
|
||||
fn next_id(&mut self) -> usize {
|
||||
self.last_request_id = self.last_request_id.wrapping_add(1);
|
||||
self.last_request_id
|
||||
}
|
||||
|
||||
/// Handle the registration messages received from the user.
|
||||
async fn handle_requests(&mut self, message: Message) {
|
||||
match message {
|
||||
Message::Request {
|
||||
method,
|
||||
params,
|
||||
sender,
|
||||
} => {
|
||||
let id = self.next_id();
|
||||
let chain_id = self.chain_id;
|
||||
|
||||
let params = match ¶ms {
|
||||
Some(params) => params.get(),
|
||||
None => "null",
|
||||
};
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{id}", "method":"{method}","params":{params}}}"#
|
||||
);
|
||||
|
||||
self.requests.insert(id, sender);
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking request id={id} chain={chain_id:?}");
|
||||
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send RPC request to lightclient {:?}",
|
||||
err.to_string()
|
||||
);
|
||||
|
||||
let sender = self
|
||||
.requests
|
||||
.remove(&id)
|
||||
.expect("Channel is inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
if sender
|
||||
.send(Err(LightClientRpcError::SmoldotError(err.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send RPC request error to id={id}",
|
||||
);
|
||||
}
|
||||
} else {
|
||||
tracing::trace!(target: LOG_TARGET, "Submitted to smoldot request with id={id}");
|
||||
}
|
||||
}
|
||||
Message::Subscription {
|
||||
method,
|
||||
unsubscribe_method,
|
||||
params,
|
||||
sender,
|
||||
} => {
|
||||
let id = self.next_id();
|
||||
let chain_id = self.chain_id;
|
||||
|
||||
// For subscriptions we need to make a plain RPC request to the subscription method.
|
||||
// The server will return as a result the subscription ID.
|
||||
let params = match ¶ms {
|
||||
Some(params) => params.get(),
|
||||
None => "null",
|
||||
};
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{id}", "method":"{method}","params":{params}}}"#
|
||||
);
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}");
|
||||
let pending_subscription = PendingSubscription {
|
||||
response_sender: sender,
|
||||
unsubscribe_method,
|
||||
};
|
||||
self.pending_subscriptions.insert(id, pending_subscription);
|
||||
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send RPC request to lightclient {:?}",
|
||||
err.to_string()
|
||||
);
|
||||
let subscription_id_state = self
|
||||
.pending_subscriptions
|
||||
.remove(&id)
|
||||
.expect("Channels are inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
if subscription_id_state
|
||||
.response_sender
|
||||
.send(Err(LightClientRpcError::SmoldotError(err.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send RPC request error to id={id}",
|
||||
);
|
||||
}
|
||||
} else {
|
||||
tracing::trace!(target: LOG_TARGET, "Submitted to smoldot subscription request with id={id}");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Parse the response received from the light client and sent it to the appropriate user.
|
||||
fn handle_rpc_response(&mut self, response: String) {
|
||||
let chain_id = self.chain_id;
|
||||
tracing::trace!(target: LOG_TARGET, "Received from smoldot response='{}' chain={chain_id:?}", trim_message(&response));
|
||||
|
||||
match RpcResponse::from_str(&response) {
|
||||
Ok(RpcResponse::Method { id, result }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
// Send the response back.
|
||||
if let Some(sender) = self.requests.remove(&id) {
|
||||
if sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(pending_subscription) = self.pending_subscriptions.remove(&id) {
|
||||
let Ok(sub_id) = serde_json::from_str::<SubscriptionId>(result.get()) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription id='{result}' chain={chain_id:?} is not a valid string",
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
|
||||
|
||||
let (sub_tx, sub_rx) = mpsc::unbounded_channel();
|
||||
|
||||
// Send the method response and a channel to receive notifications back.
|
||||
if pending_subscription
|
||||
.response_sender
|
||||
.send(Ok((sub_id.clone(), sub_rx)))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send subscription ID response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Store the other end of the notif channel to send future subscription notifications to.
|
||||
self.subscriptions.insert(
|
||||
sub_id,
|
||||
ActiveSubscription {
|
||||
notification_sender: sub_tx,
|
||||
unsubscribe_method: pending_subscription.unsubscribe_method,
|
||||
},
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Response id={id} chain={chain_id:?} is not tracked",
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::MethodError { id, error }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send error. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(sender) = self.requests.remove(&id) {
|
||||
if sender
|
||||
.send(Err(LightClientRpcError::JsonRpcError(JsonRpcError(error))))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(subscription_id_state) = self.pending_subscriptions.remove(&id) {
|
||||
if subscription_id_state
|
||||
.response_sender
|
||||
.send(Err(LightClientRpcError::JsonRpcError(JsonRpcError(error))))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id {id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::Notification {
|
||||
method,
|
||||
subscription_id,
|
||||
result,
|
||||
}) => {
|
||||
let Some(active_subscription) = self.subscriptions.get_mut(&subscription_id) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription response id={subscription_id} chain={chain_id:?} method={method} is not tracked",
|
||||
);
|
||||
return;
|
||||
};
|
||||
if active_subscription
|
||||
.notification_sender
|
||||
.send(Ok(result))
|
||||
.is_err()
|
||||
{
|
||||
self.unsubscribe(&subscription_id, chain_id);
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::NotificationError {
|
||||
method,
|
||||
subscription_id,
|
||||
error,
|
||||
}) => {
|
||||
let Some(active_subscription) = self.subscriptions.get_mut(&subscription_id) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription error id={subscription_id} chain={chain_id:?} method={method} is not tracked",
|
||||
);
|
||||
return;
|
||||
};
|
||||
if active_subscription
|
||||
.notification_sender
|
||||
.send(Err(JsonRpcError(error)))
|
||||
.is_err()
|
||||
{
|
||||
self.unsubscribe(&subscription_id, chain_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(target: LOG_TARGET, "cannot decode RPC response {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Unsubscribe from a subscription.
|
||||
fn unsubscribe(&mut self, subscription_id: &str, chain_id: smoldot_light::ChainId) {
|
||||
let Some(active_subscription) = self.subscriptions.remove(subscription_id) else {
|
||||
// Subscription doesn't exist so nothing more to do.
|
||||
return;
|
||||
};
|
||||
|
||||
// Build a call to unsubscribe from this method.
|
||||
let unsub_id = self.next_id();
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
|
||||
unsub_id, active_subscription.unsubscribe_method, subscription_id
|
||||
);
|
||||
|
||||
// Submit it.
|
||||
if let Err(err) = self.client.json_rpc_request(request, chain_id) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to unsubscribe id={subscription_id} chain={chain_id:?} method={:?} err={err:?}", active_subscription.unsubscribe_method
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={subscription_id} chain={chain_id:?} method={:?}", active_subscription.unsubscribe_method);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,71 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use serde_json::Value;
|
||||
use std::borrow::Cow;
|
||||
|
||||
/// Something went wrong building chain config.
|
||||
#[non_exhaustive]
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ChainConfigError {
|
||||
/// The provided chain spec is the wrong shape.
|
||||
#[error("Invalid chain spec format")]
|
||||
InvalidSpecFormat,
|
||||
}
|
||||
|
||||
/// Configuration to connect to a chain.
|
||||
pub struct ChainConfig<'a> {
|
||||
// The chain spec to use.
|
||||
chain_spec: Cow<'a, str>,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a str> for ChainConfig<'a> {
|
||||
fn from(chain_spec: &'a str) -> Self {
|
||||
ChainConfig::chain_spec(chain_spec)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for ChainConfig<'_> {
|
||||
fn from(chain_spec: String) -> Self {
|
||||
ChainConfig::chain_spec(chain_spec)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ChainConfig<'a> {
|
||||
/// Construct a chain config from a chain spec.
|
||||
pub fn chain_spec(chain_spec: impl Into<Cow<'a, str>>) -> Self {
|
||||
ChainConfig {
|
||||
chain_spec: chain_spec.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the bootnodes to the given ones.
|
||||
pub fn set_bootnodes<S: AsRef<str>>(
|
||||
self,
|
||||
bootnodes: impl IntoIterator<Item = S>,
|
||||
) -> Result<Self, ChainConfigError> {
|
||||
let mut chain_spec_json: Value = serde_json::from_str(&self.chain_spec)
|
||||
.map_err(|_e| ChainConfigError::InvalidSpecFormat)?;
|
||||
|
||||
if let Value::Object(map) = &mut chain_spec_json {
|
||||
let bootnodes = bootnodes
|
||||
.into_iter()
|
||||
.map(|s| Value::String(s.as_ref().to_owned()))
|
||||
.collect();
|
||||
|
||||
map.insert("bootNodes".to_string(), Value::Array(bootnodes));
|
||||
} else {
|
||||
return Err(ChainConfigError::InvalidSpecFormat);
|
||||
}
|
||||
|
||||
Ok(ChainConfig {
|
||||
chain_spec: Cow::Owned(chain_spec_json.to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
// Used internally to fetch the chain spec back out.
|
||||
pub(crate) fn as_chain_spec(&self) -> &str {
|
||||
&self.chain_spec
|
||||
}
|
||||
}
|
||||
-269
@@ -1,269 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
//! A wrapper around [`smoldot_light`] which provides an light client capable of connecting
|
||||
//! to Substrate based chains.
|
||||
|
||||
#![deny(missing_docs)]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
|
||||
#[cfg(any(
|
||||
all(feature = "web", feature = "native"),
|
||||
not(any(feature = "web", feature = "native"))
|
||||
))]
|
||||
compile_error!("subxt-lightclient: exactly one of the 'web' and 'native' features should be used.");
|
||||
|
||||
mod platform;
|
||||
mod shared_client;
|
||||
// mod receiver;
|
||||
mod background;
|
||||
mod chain_config;
|
||||
mod rpc;
|
||||
|
||||
use background::{BackgroundTask, BackgroundTaskHandle};
|
||||
use futures::Stream;
|
||||
use platform::DefaultPlatform;
|
||||
use serde_json::value::RawValue;
|
||||
use shared_client::SharedClient;
|
||||
use std::future::Future;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub use chain_config::{ChainConfig, ChainConfigError};
|
||||
|
||||
/// Things that can go wrong when constructing the [`LightClient`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum LightClientError {
|
||||
/// Error encountered while adding the chain to the light-client.
|
||||
#[error("Failed to add the chain to the light client: {0}.")]
|
||||
AddChainError(String),
|
||||
}
|
||||
|
||||
/// Things that can go wrong calling methods of [`LightClientRpc`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum LightClientRpcError {
|
||||
/// Error response from the JSON-RPC server.
|
||||
#[error(transparent)]
|
||||
JsonRpcError(JsonRpcError),
|
||||
/// Smoldot could not handle the RPC call.
|
||||
#[error("Smoldot could not handle the RPC call: {0}.")]
|
||||
SmoldotError(String),
|
||||
/// Background task dropped.
|
||||
#[error("The background task was dropped.")]
|
||||
BackgroundTaskDropped,
|
||||
}
|
||||
|
||||
/// An error response from the JSON-RPC server (ie smoldot) in response to
|
||||
/// a method call or as a subscription notification.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("RPC Error: {0}.")]
|
||||
pub struct JsonRpcError(Box<RawValue>);
|
||||
|
||||
impl JsonRpcError {
|
||||
/// Attempt to deserialize this error into some type.
|
||||
pub fn try_deserialize<'a, T: serde::de::Deserialize<'a>>(
|
||||
&'a self,
|
||||
) -> Result<T, serde_json::Error> {
|
||||
serde_json::from_str(self.0.get())
|
||||
}
|
||||
}
|
||||
|
||||
/// This represents a single light client connection to the network. Instantiate
|
||||
/// it with [`LightClient::relay_chain()`] to communicate with a relay chain, and
|
||||
/// then call [`LightClient::parachain()`] to establish connections to parachains.
|
||||
#[derive(Clone)]
|
||||
pub struct LightClient {
|
||||
client: SharedClient<DefaultPlatform>,
|
||||
relay_chain_id: smoldot_light::ChainId,
|
||||
}
|
||||
|
||||
impl LightClient {
|
||||
/// Given a chain spec, establish a connection to a relay chain. Any subsequent calls to
|
||||
/// [`LightClient::parachain()`] will set this as the relay chain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ## Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ## Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// <https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html>.
|
||||
pub fn relay_chain<'a>(
|
||||
chain_config: impl Into<ChainConfig<'a>>,
|
||||
) -> Result<(Self, LightClientRpc), LightClientError> {
|
||||
let mut client = smoldot_light::Client::new(platform::build_platform());
|
||||
let chain_config = chain_config.into();
|
||||
let chain_spec = chain_config.as_chain_spec();
|
||||
|
||||
let config = smoldot_light::AddChainConfig {
|
||||
specification: chain_spec,
|
||||
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
|
||||
max_pending_requests: u32::MAX.try_into().unwrap(),
|
||||
max_subscriptions: u32::MAX,
|
||||
},
|
||||
database_content: "",
|
||||
potential_relay_chains: std::iter::empty(),
|
||||
user_data: (),
|
||||
};
|
||||
|
||||
let added_chain = client
|
||||
.add_chain(config)
|
||||
.map_err(|err| LightClientError::AddChainError(err.to_string()))?;
|
||||
|
||||
let relay_chain_id = added_chain.chain_id;
|
||||
let rpc_responses = added_chain
|
||||
.json_rpc_responses
|
||||
.expect("Light client RPC configured; qed");
|
||||
let shared_client: SharedClient<_> = client.into();
|
||||
|
||||
let light_client_rpc =
|
||||
LightClientRpc::new_raw(shared_client.clone(), relay_chain_id, rpc_responses);
|
||||
let light_client = Self {
|
||||
client: shared_client,
|
||||
relay_chain_id,
|
||||
};
|
||||
|
||||
Ok((light_client, light_client_rpc))
|
||||
}
|
||||
|
||||
/// Given a chain spec, establish a connection to a parachain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ## Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ## Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// <https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html>.
|
||||
pub fn parachain<'a>(
|
||||
&self,
|
||||
chain_config: impl Into<ChainConfig<'a>>,
|
||||
) -> Result<LightClientRpc, LightClientError> {
|
||||
let chain_config = chain_config.into();
|
||||
let chain_spec = chain_config.as_chain_spec();
|
||||
|
||||
let config = smoldot_light::AddChainConfig {
|
||||
specification: chain_spec,
|
||||
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
|
||||
max_pending_requests: u32::MAX.try_into().unwrap(),
|
||||
max_subscriptions: u32::MAX,
|
||||
},
|
||||
database_content: "",
|
||||
potential_relay_chains: std::iter::once(self.relay_chain_id),
|
||||
user_data: (),
|
||||
};
|
||||
|
||||
let added_chain = self
|
||||
.client
|
||||
.add_chain(config)
|
||||
.map_err(|err| LightClientError::AddChainError(err.to_string()))?;
|
||||
|
||||
let chain_id = added_chain.chain_id;
|
||||
let rpc_responses = added_chain
|
||||
.json_rpc_responses
|
||||
.expect("Light client RPC configured; qed");
|
||||
|
||||
Ok(LightClientRpc::new_raw(
|
||||
self.client.clone(),
|
||||
chain_id,
|
||||
rpc_responses,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// This represents a single RPC connection to a specific chain, and is constructed by calling
|
||||
/// one of the methods on [`LightClient`]. Using this, you can make RPC requests to the chain.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LightClientRpc {
|
||||
handle: BackgroundTaskHandle,
|
||||
}
|
||||
|
||||
impl LightClientRpc {
|
||||
// Dev note: this would provide a "low level" interface if one is needed.
|
||||
// Do we actually need to provide this, or can we entirely hide Smoldot?
|
||||
pub(crate) fn new_raw<TPlat, TChain>(
|
||||
client: impl Into<SharedClient<TPlat, TChain>>,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
rpc_responses: smoldot_light::JsonRpcResponses<TPlat>,
|
||||
) -> Self
|
||||
where
|
||||
TPlat: smoldot_light::platform::PlatformRef + Send + 'static,
|
||||
TChain: Send + 'static,
|
||||
{
|
||||
let (background_task, background_handle) =
|
||||
BackgroundTask::new(client.into(), chain_id, rpc_responses);
|
||||
|
||||
// For now we spawn the background task internally, but later we can expose
|
||||
// methods to give this back to the user so that they can exert backpressure.
|
||||
spawn(async move { background_task.run().await });
|
||||
|
||||
LightClientRpc {
|
||||
handle: background_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Make an RPC request to a chain, getting back a result.
|
||||
pub async fn request(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
) -> Result<Box<RawValue>, LightClientRpcError> {
|
||||
self.handle.request(method, params).await
|
||||
}
|
||||
|
||||
/// Subscribe to some RPC method, getting back a stream of notifications.
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
unsub: String,
|
||||
) -> Result<LightClientRpcSubscription, LightClientRpcError> {
|
||||
let (id, notifications) = self.handle.subscribe(method, params, unsub).await?;
|
||||
Ok(LightClientRpcSubscription { id, notifications })
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream of notifications handed back when [`LightClientRpc::subscribe`] is called.
|
||||
pub struct LightClientRpcSubscription {
|
||||
notifications: mpsc::UnboundedReceiver<Result<Box<RawValue>, JsonRpcError>>,
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl LightClientRpcSubscription {
|
||||
/// Return the subscription ID
|
||||
pub fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for LightClientRpcSubscription {
|
||||
type Item = Result<Box<RawValue>, JsonRpcError>;
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.notifications.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// A quick helper to spawn a task that works for WASM.
|
||||
fn spawn<F: Future + Send + 'static>(future: F) {
|
||||
#[cfg(feature = "native")]
|
||||
tokio::spawn(async move {
|
||||
future.await;
|
||||
});
|
||||
#[cfg(feature = "web")]
|
||||
wasm_bindgen_futures::spawn_local(async move {
|
||||
future.await;
|
||||
});
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
//! Default platform for WASM environments.
|
||||
|
||||
#[cfg(feature = "web")]
|
||||
mod wasm_helpers;
|
||||
#[cfg(feature = "web")]
|
||||
mod wasm_platform;
|
||||
#[cfg(feature = "web")]
|
||||
mod wasm_socket;
|
||||
|
||||
pub use helpers::{DefaultPlatform, build_platform};
|
||||
|
||||
#[cfg(feature = "native")]
|
||||
mod helpers {
|
||||
use smoldot_light::platform::default::DefaultPlatform as Platform;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub type DefaultPlatform = Arc<Platform>;
|
||||
|
||||
pub fn build_platform() -> DefaultPlatform {
|
||||
Platform::new(
|
||||
"subxt-light-client".into(),
|
||||
env!("CARGO_PKG_VERSION").into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "web")]
|
||||
mod helpers {
|
||||
use super::wasm_platform::SubxtPlatform as Platform;
|
||||
|
||||
pub type DefaultPlatform = Platform;
|
||||
|
||||
pub fn build_platform() -> DefaultPlatform {
|
||||
Platform::new()
|
||||
}
|
||||
}
|
||||
@@ -1,42 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
//! Wasm implementation for the light client's platform using
|
||||
//! custom websockets.
|
||||
|
||||
use super::wasm_socket::WasmSocket;
|
||||
|
||||
use core::time::Duration;
|
||||
use futures_util::{FutureExt, future};
|
||||
|
||||
pub fn now_from_unix_epoch() -> Duration {
|
||||
web_time::SystemTime::now()
|
||||
.duration_since(web_time::SystemTime::UNIX_EPOCH)
|
||||
.unwrap_or_else(|_| {
|
||||
panic!("Invalid systime cannot be configured earlier than `UNIX_EPOCH`")
|
||||
})
|
||||
}
|
||||
|
||||
pub type Instant = web_time::Instant;
|
||||
|
||||
pub fn now() -> Instant {
|
||||
web_time::Instant::now()
|
||||
}
|
||||
|
||||
pub type Delay = future::BoxFuture<'static, ()>;
|
||||
|
||||
pub fn sleep(duration: Duration) -> Delay {
|
||||
futures_timer::Delay::new(duration).boxed()
|
||||
}
|
||||
|
||||
/// Implementation detail of a stream from the `SubxtPlatform`.
|
||||
#[pin_project::pin_project]
|
||||
pub struct Stream(
|
||||
#[pin]
|
||||
pub smoldot::libp2p::with_buffers::WithBuffers<
|
||||
future::BoxFuture<'static, Result<WasmSocket, std::io::Error>>,
|
||||
WasmSocket,
|
||||
Instant,
|
||||
>,
|
||||
);
|
||||
@@ -1,226 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use super::wasm_socket::WasmSocket;
|
||||
|
||||
use core::{
|
||||
fmt::{self, Write as _},
|
||||
net::IpAddr,
|
||||
time::Duration,
|
||||
};
|
||||
use futures::prelude::*;
|
||||
use smoldot::libp2p::with_buffers;
|
||||
use smoldot_light::platform::{
|
||||
Address, ConnectionType, LogLevel, MultiStreamAddress, MultiStreamWebRtcConnection,
|
||||
PlatformRef, SubstreamDirection,
|
||||
};
|
||||
|
||||
use std::{io, net::SocketAddr, pin::Pin};
|
||||
|
||||
const LOG_TARGET: &str = "subxt-platform-wasm";
|
||||
|
||||
/// Subxt platform implementation for wasm.
|
||||
///
|
||||
/// This implementation is a conversion of the implementation from the smoldot:
|
||||
/// https://github.com/smol-dot/smoldot/blob/6401d4df90569e23073d646b14a8fbf9f7e6bdd3/light-base/src/platform/default.rs#L83.
|
||||
///
|
||||
/// This platform will evolve over time and we'll need to keep this code in sync.
|
||||
#[derive(Clone)]
|
||||
pub struct SubxtPlatform {}
|
||||
|
||||
impl SubxtPlatform {
|
||||
pub fn new() -> Self {
|
||||
SubxtPlatform {}
|
||||
}
|
||||
}
|
||||
|
||||
impl PlatformRef for SubxtPlatform {
|
||||
type Delay = super::wasm_helpers::Delay;
|
||||
type Instant = super::wasm_helpers::Instant;
|
||||
type MultiStream = std::convert::Infallible;
|
||||
type Stream = super::wasm_helpers::Stream;
|
||||
type StreamConnectFuture = future::Ready<Self::Stream>;
|
||||
type MultiStreamConnectFuture = future::Pending<MultiStreamWebRtcConnection<Self::MultiStream>>;
|
||||
type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a, Self::Instant>;
|
||||
type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
|
||||
type StreamErrorRef<'a> = &'a std::io::Error;
|
||||
type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
|
||||
|
||||
fn now_from_unix_epoch(&self) -> Duration {
|
||||
super::wasm_helpers::now_from_unix_epoch()
|
||||
}
|
||||
|
||||
fn now(&self) -> Self::Instant {
|
||||
super::wasm_helpers::now()
|
||||
}
|
||||
|
||||
fn fill_random_bytes(&self, buffer: &mut [u8]) {
|
||||
// This could fail if the system does not have access to a good source of entropy.
|
||||
// Note: `rand::RngCore::fill_bytes` also panics on errors and `rand::OsCore` calls
|
||||
// identically into `getrandom::getrandom`.
|
||||
getrandom::getrandom(buffer).expect("Cannot fill random bytes");
|
||||
}
|
||||
|
||||
fn sleep(&self, duration: Duration) -> Self::Delay {
|
||||
super::wasm_helpers::sleep(duration)
|
||||
}
|
||||
|
||||
fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
|
||||
self.sleep(when.saturating_duration_since(self.now()))
|
||||
}
|
||||
|
||||
fn spawn_task(
|
||||
&self,
|
||||
_task_name: std::borrow::Cow<'_, str>,
|
||||
task: impl future::Future<Output = ()> + Send + 'static,
|
||||
) {
|
||||
wasm_bindgen_futures::spawn_local(task);
|
||||
}
|
||||
|
||||
fn client_name(&self) -> std::borrow::Cow<'_, str> {
|
||||
"subxt-light-client".into()
|
||||
}
|
||||
|
||||
fn client_version(&self) -> std::borrow::Cow<'_, str> {
|
||||
env!("CARGO_PKG_VERSION").into()
|
||||
}
|
||||
|
||||
fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
|
||||
let result = matches!(
|
||||
connection_type,
|
||||
ConnectionType::WebSocketIpv4 { .. }
|
||||
| ConnectionType::WebSocketIpv6 { .. }
|
||||
| ConnectionType::WebSocketDns { .. }
|
||||
);
|
||||
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Supports connection type={:?} result={}",
|
||||
connection_type, result
|
||||
);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
|
||||
tracing::trace!(target: LOG_TARGET, "Connect stream to multiaddr={:?}", multiaddr);
|
||||
|
||||
// `PlatformRef` trait guarantees that `connect_stream` is only called with addresses
|
||||
// stated in `supports_connection_type`.
|
||||
let addr = match multiaddr {
|
||||
Address::WebSocketDns {
|
||||
hostname,
|
||||
port,
|
||||
secure: true,
|
||||
} => {
|
||||
format!("wss://{hostname}:{port}")
|
||||
}
|
||||
Address::WebSocketDns {
|
||||
hostname,
|
||||
port,
|
||||
secure: false,
|
||||
} => {
|
||||
format!("ws://{hostname}:{port}")
|
||||
}
|
||||
Address::WebSocketIp {
|
||||
ip: IpAddr::V4(ip),
|
||||
port,
|
||||
} => {
|
||||
let addr = SocketAddr::from((ip, port));
|
||||
format!("ws://{addr}")
|
||||
}
|
||||
Address::WebSocketIp {
|
||||
ip: IpAddr::V6(ip),
|
||||
port,
|
||||
} => {
|
||||
let addr = SocketAddr::from((ip, port));
|
||||
format!("ws://{addr}")
|
||||
}
|
||||
|
||||
// The API user of the `PlatformRef` trait is never supposed to open connections of
|
||||
// a type that isn't supported.
|
||||
_ => {
|
||||
unreachable!(
|
||||
"Connecting to an address not supported. This code path indicates a bug in smoldot. Please raise an issue at https://github.com/smol-dot/smoldot/issues"
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let socket_future = async move {
|
||||
tracing::debug!(target: LOG_TARGET, "Connecting to addr={addr}");
|
||||
WasmSocket::new(addr.as_str()).map_err(|err| std::io::Error::other(err.to_string()))
|
||||
};
|
||||
|
||||
future::ready(super::wasm_helpers::Stream(with_buffers::WithBuffers::new(
|
||||
Box::pin(socket_future),
|
||||
)))
|
||||
}
|
||||
|
||||
fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture {
|
||||
panic!(
|
||||
"Multistreams are not currently supported. This code path indicates a bug in smoldot. Please raise an issue at https://github.com/smol-dot/smoldot/issues"
|
||||
)
|
||||
}
|
||||
|
||||
fn open_out_substream(&self, c: &mut Self::MultiStream) {
|
||||
// This function can only be called with so-called "multi-stream" connections. We never
|
||||
// open such connection.
|
||||
match *c {}
|
||||
}
|
||||
|
||||
fn next_substream(&self, c: &'_ mut Self::MultiStream) -> Self::NextSubstreamFuture<'_> {
|
||||
// This function can only be called with so-called "multi-stream" connections. We never
|
||||
// open such connection.
|
||||
match *c {}
|
||||
}
|
||||
|
||||
fn read_write_access<'a>(
|
||||
&self,
|
||||
stream: Pin<&'a mut Self::Stream>,
|
||||
) -> Result<Self::ReadWriteAccess<'a>, &'a io::Error> {
|
||||
let stream = stream.project();
|
||||
stream.0.read_write_access(Self::Instant::now())
|
||||
}
|
||||
|
||||
fn wait_read_write_again<'a>(
|
||||
&self,
|
||||
stream: Pin<&'a mut Self::Stream>,
|
||||
) -> Self::StreamUpdateFuture<'a> {
|
||||
let stream = stream.project();
|
||||
Box::pin(stream.0.wait_read_write_again(|when| async move {
|
||||
let now = super::wasm_helpers::now();
|
||||
let duration = when.saturating_duration_since(now);
|
||||
super::wasm_helpers::sleep(duration).await;
|
||||
}))
|
||||
}
|
||||
|
||||
fn log<'a>(
|
||||
&self,
|
||||
log_level: LogLevel,
|
||||
log_target: &'a str,
|
||||
message: &'a str,
|
||||
key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>,
|
||||
) {
|
||||
let mut message_build = String::with_capacity(128);
|
||||
message_build.push_str(message);
|
||||
let mut first = true;
|
||||
for (key, value) in key_values {
|
||||
if first {
|
||||
let _ = write!(message_build, "; ");
|
||||
first = false;
|
||||
} else {
|
||||
let _ = write!(message_build, ", ");
|
||||
}
|
||||
let _ = write!(message_build, "{key}={value}");
|
||||
}
|
||||
|
||||
match log_level {
|
||||
LogLevel::Error => tracing::error!("target={log_target} {message_build}"),
|
||||
LogLevel::Warn => tracing::warn!("target={log_target} {message_build}"),
|
||||
LogLevel::Info => tracing::info!("target={log_target} {message_build}"),
|
||||
LogLevel::Debug => tracing::debug!("target={log_target} {message_build}"),
|
||||
LogLevel::Trace => tracing::trace!("target={log_target} {message_build}"),
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,247 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use futures::{io, prelude::*};
|
||||
use send_wrapper::SendWrapper;
|
||||
use wasm_bindgen::{JsCast, prelude::*};
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
pin::Pin,
|
||||
sync::{Arc, Mutex},
|
||||
task::Poll,
|
||||
task::{Context, Waker},
|
||||
};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Failed to connect {0}")]
|
||||
ConnectionError(String),
|
||||
}
|
||||
|
||||
/// Websocket for WASM environments.
|
||||
///
|
||||
/// This is a rust-based wrapper around browser's WebSocket API.
|
||||
///
|
||||
// Warning: It is not safe to have `Clone` on this structure.
|
||||
pub struct WasmSocket {
|
||||
/// Inner data shared between `poll` and web_sys callbacks.
|
||||
inner: Arc<Mutex<InnerWasmSocket>>,
|
||||
/// This implements `Send` and panics if the value is accessed
|
||||
/// or dropped from another thread.
|
||||
///
|
||||
/// This is safe in wasm environments.
|
||||
socket: SendWrapper<web_sys::WebSocket>,
|
||||
/// In memory callbacks to handle messages from the browser socket.
|
||||
_callbacks: SendWrapper<Callbacks>,
|
||||
}
|
||||
|
||||
/// The state of the [`WasmSocket`].
|
||||
#[derive(PartialEq, Eq, Clone, Copy)]
|
||||
enum ConnectionState {
|
||||
/// Initial state of the socket.
|
||||
Connecting,
|
||||
/// Socket is fully opened.
|
||||
Opened,
|
||||
/// Socket is closed.
|
||||
Closed,
|
||||
/// Error reported by callbacks.
|
||||
Error,
|
||||
}
|
||||
|
||||
struct InnerWasmSocket {
|
||||
/// The state of the connection.
|
||||
state: ConnectionState,
|
||||
/// Data buffer for the socket.
|
||||
data: VecDeque<u8>,
|
||||
/// Waker from `poll_read` / `poll_write`.
|
||||
waker: Option<Waker>,
|
||||
}
|
||||
|
||||
/// Registered callbacks of the [`WasmSocket`].
|
||||
///
|
||||
/// These need to be kept around until the socket is dropped.
|
||||
type Callbacks = (
|
||||
Closure<dyn FnMut()>,
|
||||
Closure<dyn FnMut(web_sys::MessageEvent)>,
|
||||
Closure<dyn FnMut(web_sys::Event)>,
|
||||
Closure<dyn FnMut(web_sys::CloseEvent)>,
|
||||
);
|
||||
|
||||
impl WasmSocket {
|
||||
/// Establish a WebSocket connection.
|
||||
///
|
||||
/// The error is a string representing the browser error.
|
||||
/// Visit [MDN Documentation](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/WebSocket#exceptions_thrown)
|
||||
/// for more info.
|
||||
pub fn new(addr: &str) -> Result<Self, Error> {
|
||||
let socket = match web_sys::WebSocket::new(addr) {
|
||||
Ok(socket) => socket,
|
||||
Err(err) => return Err(Error::ConnectionError(format!("{err:?}"))),
|
||||
};
|
||||
|
||||
socket.set_binary_type(web_sys::BinaryType::Arraybuffer);
|
||||
|
||||
let inner = Arc::new(Mutex::new(InnerWasmSocket {
|
||||
state: ConnectionState::Connecting,
|
||||
data: VecDeque::with_capacity(16384),
|
||||
waker: None,
|
||||
}));
|
||||
|
||||
let open_callback = Closure::<dyn FnMut()>::new({
|
||||
let inner = inner.clone();
|
||||
move || {
|
||||
let mut inner = inner.lock().expect("Mutex is poised; qed");
|
||||
inner.state = ConnectionState::Opened;
|
||||
|
||||
if let Some(waker) = inner.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
});
|
||||
socket.set_onopen(Some(open_callback.as_ref().unchecked_ref()));
|
||||
|
||||
let message_callback = Closure::<dyn FnMut(_)>::new({
|
||||
let inner = inner.clone();
|
||||
move |event: web_sys::MessageEvent| {
|
||||
let Ok(buffer) = event.data().dyn_into::<js_sys::ArrayBuffer>() else {
|
||||
panic!("Unexpected data format {:?}", event.data());
|
||||
};
|
||||
|
||||
let mut inner = inner.lock().expect("Mutex is poised; qed");
|
||||
let bytes = js_sys::Uint8Array::new(&buffer).to_vec();
|
||||
inner.data.extend(bytes);
|
||||
|
||||
if let Some(waker) = inner.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
});
|
||||
socket.set_onmessage(Some(message_callback.as_ref().unchecked_ref()));
|
||||
|
||||
let error_callback = Closure::<dyn FnMut(_)>::new({
|
||||
let inner = inner.clone();
|
||||
move |_event: web_sys::Event| {
|
||||
// Callback does not provide useful information, signal it back to the stream.
|
||||
let mut inner = inner.lock().expect("Mutex is poised; qed");
|
||||
inner.state = ConnectionState::Error;
|
||||
|
||||
if let Some(waker) = inner.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
});
|
||||
socket.set_onerror(Some(error_callback.as_ref().unchecked_ref()));
|
||||
|
||||
let close_callback = Closure::<dyn FnMut(_)>::new({
|
||||
let inner = inner.clone();
|
||||
move |_event: web_sys::CloseEvent| {
|
||||
let mut inner = inner.lock().expect("Mutex is poised; qed");
|
||||
inner.state = ConnectionState::Closed;
|
||||
|
||||
if let Some(waker) = inner.waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
});
|
||||
socket.set_onclose(Some(close_callback.as_ref().unchecked_ref()));
|
||||
|
||||
let callbacks = (
|
||||
open_callback,
|
||||
message_callback,
|
||||
error_callback,
|
||||
close_callback,
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
inner,
|
||||
socket: SendWrapper::new(socket),
|
||||
_callbacks: SendWrapper::new(callbacks),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for WasmSocket {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
let mut inner = self.inner.lock().expect("Mutex is poised; qed");
|
||||
inner.waker = Some(cx.waker().clone());
|
||||
|
||||
if self.socket.ready_state() == web_sys::WebSocket::CONNECTING {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
match inner.state {
|
||||
ConnectionState::Error => Poll::Ready(Err(io::Error::other("Socket error"))),
|
||||
ConnectionState::Closed => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
|
||||
ConnectionState::Connecting => Poll::Pending,
|
||||
ConnectionState::Opened => {
|
||||
if inner.data.is_empty() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
let n = inner.data.len().min(buf.len());
|
||||
for k in buf.iter_mut().take(n) {
|
||||
*k = inner.data.pop_front().expect("Buffer non empty; qed");
|
||||
}
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for WasmSocket {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
let mut inner = self.inner.lock().expect("Mutex is poised; qed");
|
||||
inner.waker = Some(cx.waker().clone());
|
||||
|
||||
match inner.state {
|
||||
ConnectionState::Error => Poll::Ready(Err(io::Error::other("Socket error"))),
|
||||
ConnectionState::Closed => Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())),
|
||||
ConnectionState::Connecting => Poll::Pending,
|
||||
ConnectionState::Opened => match self.socket.send_with_u8_array(buf) {
|
||||
Ok(()) => Poll::Ready(Ok(buf.len())),
|
||||
Err(err) => Poll::Ready(Err(io::Error::other(format!("Write error: {err:?}")))),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
if self.socket.ready_state() == web_sys::WebSocket::CLOSED {
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
if self.socket.ready_state() != web_sys::WebSocket::CLOSING {
|
||||
let _ = self.socket.close();
|
||||
}
|
||||
|
||||
let mut inner = self.inner.lock().expect("Mutex is poised; qed");
|
||||
inner.waker = Some(cx.waker().clone());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WasmSocket {
|
||||
fn drop(&mut self) {
|
||||
if self.socket.ready_state() != web_sys::WebSocket::CLOSING {
|
||||
let _ = self.socket.close();
|
||||
}
|
||||
|
||||
self.socket.set_onopen(None);
|
||||
self.socket.set_onmessage(None);
|
||||
self.socket.set_onerror(None);
|
||||
self.socket.set_onclose(None);
|
||||
}
|
||||
}
|
||||
-132
@@ -1,132 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
/// The RPC response from the light-client.
|
||||
/// This can either be a response of a method, or a notification from a subscription.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RpcResponse {
|
||||
Method {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// The result of the method call.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
MethodError {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// Error.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
Notification {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
subscription_id: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
NotificationError {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
subscription_id: String,
|
||||
/// Result.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
}
|
||||
|
||||
impl std::str::FromStr for RpcResponse {
|
||||
type Err = ();
|
||||
|
||||
fn from_str(response: &str) -> Result<Self, Self::Err> {
|
||||
// Valid response
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Response {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
id: String,
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Error response
|
||||
#[derive(Deserialize)]
|
||||
struct ResponseError {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
id: String,
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Valid notification (subscription) response
|
||||
#[derive(Deserialize)]
|
||||
struct Notification {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: NotificationResultParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationResultParams {
|
||||
subscription: String,
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Error notification (subscription) response
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationError {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: NotificationErrorParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationErrorParams {
|
||||
/// The ID of the subscription.
|
||||
subscription: String,
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Try deserializing the response payload to one of the above. We can
|
||||
// do this more efficiently eg how jsonrpsee_types does.
|
||||
|
||||
let result: Result<Response, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Method {
|
||||
id: response.id,
|
||||
result: response.result,
|
||||
});
|
||||
}
|
||||
let result: Result<Notification, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Notification {
|
||||
subscription_id: response.params.subscription,
|
||||
method: response.method,
|
||||
result: response.params.result,
|
||||
});
|
||||
}
|
||||
let result: Result<ResponseError, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::MethodError {
|
||||
id: response.id,
|
||||
error: response.error,
|
||||
});
|
||||
}
|
||||
let result: Result<NotificationError, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::NotificationError {
|
||||
method: response.method,
|
||||
subscription_id: response.params.subscription,
|
||||
error: response.params.error,
|
||||
});
|
||||
}
|
||||
|
||||
// We couldn't decode into any of the above. We could pick one of the above`
|
||||
// errors to return, but there's no real point since the string is obviously
|
||||
// different from any of them.
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
@@ -1,47 +0,0 @@
|
||||
// Copyright 2019-2025 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use smoldot_light as sl;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// This wraps [`smoldot_light::Client`] so that it can be cloned and shared.
|
||||
#[derive(Clone)]
|
||||
pub struct SharedClient<TPlat: sl::platform::PlatformRef, TChain = ()> {
|
||||
client: Arc<Mutex<sl::Client<TPlat, TChain>>>,
|
||||
}
|
||||
|
||||
impl<TPlat: sl::platform::PlatformRef, TChain> From<sl::Client<TPlat, TChain>>
|
||||
for SharedClient<TPlat, TChain>
|
||||
{
|
||||
fn from(client: sl::Client<TPlat, TChain>) -> Self {
|
||||
SharedClient {
|
||||
client: Arc::new(Mutex::new(client)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TPlat: sl::platform::PlatformRef, TChain> SharedClient<TPlat, TChain> {
|
||||
/// Delegates to [`smoldot_light::Client::json_rpc_request()`].
|
||||
pub(crate) fn json_rpc_request(
|
||||
&self,
|
||||
json_rpc_request: impl Into<String>,
|
||||
chain_id: sl::ChainId,
|
||||
) -> Result<(), sl::HandleRpcError> {
|
||||
self.client
|
||||
.lock()
|
||||
.expect("mutex should not be poisoned")
|
||||
.json_rpc_request(json_rpc_request, chain_id)
|
||||
}
|
||||
|
||||
/// Delegates to [`smoldot_light::Client::add_chain()`].
|
||||
pub(crate) fn add_chain(
|
||||
&self,
|
||||
config: sl::AddChainConfig<'_, TChain, impl Iterator<Item = sl::ChainId>>,
|
||||
) -> Result<sl::AddChainSuccess<TPlat>, sl::AddChainError> {
|
||||
self.client
|
||||
.lock()
|
||||
.expect("mutex should not be poisoned")
|
||||
.add_chain(config)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user