mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 01:11:10 +00:00
Merge remote-tracking branch 'origin/master' into lexnv/update-smoldot
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
+9
-21
@@ -24,23 +24,14 @@ default = ["native"]
|
||||
# Exactly 1 of "web" and "native" is expected.
|
||||
native = [
|
||||
"smoldot-light/std",
|
||||
"tokio-stream",
|
||||
"tokio/sync",
|
||||
"tokio/rt",
|
||||
"futures-util",
|
||||
]
|
||||
|
||||
# Enable this for web/wasm builds.
|
||||
# Exactly 1 of "web" and "native" is expected.
|
||||
web = [
|
||||
"getrandom/js",
|
||||
|
||||
"smoldot",
|
||||
"smoldot/std",
|
||||
"smoldot-light",
|
||||
"tokio-stream",
|
||||
"tokio/sync",
|
||||
"futures-util",
|
||||
|
||||
# For the light-client platform.
|
||||
"wasm-bindgen-futures",
|
||||
@@ -56,29 +47,26 @@ web = [
|
||||
]
|
||||
|
||||
[dependencies]
|
||||
futures = { workspace = true }
|
||||
futures = { workspace = true, features = ["async-await"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true, features = ["raw_value"] }
|
||||
serde_json = { workspace = true, features = ["default", "raw_value"] }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
smoldot-light = { workspace = true }
|
||||
tokio-stream = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
futures-util = { workspace = true }
|
||||
|
||||
# Light client support:
|
||||
smoldot = { workspace = true, optional = true }
|
||||
smoldot-light = { workspace = true, optional = true }
|
||||
either = { workspace = true, optional = true }
|
||||
tokio = { workspace = true, optional = true }
|
||||
tokio-stream = { workspace = true, optional = true }
|
||||
futures-util = { workspace = true, optional = true }
|
||||
# Only needed for web
|
||||
js-sys = { workspace = true, optional = true }
|
||||
send_wrapper = { workspace = true, optional = true }
|
||||
web-sys = { workspace = true, optional = true }
|
||||
wasm-bindgen = { workspace = true, optional = true }
|
||||
wasm-bindgen-futures = { workspace = true, optional = true }
|
||||
smoldot = { workspace = true, optional = true }
|
||||
pin-project = { workspace = true, optional = true }
|
||||
futures-timer = { workspace = true, optional = true }
|
||||
instant = { workspace = true, optional = true }
|
||||
pin-project = { workspace = true, optional = true }
|
||||
|
||||
# Included if "web" feature is enabled, to enable its js feature.
|
||||
getrandom = { workspace = true, optional = true }
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
|
||||
+331
-344
@@ -1,43 +1,47 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use futures::stream::StreamExt;
|
||||
use futures_util::future::{self, Either};
|
||||
use serde::Deserialize;
|
||||
use crate::rpc::RpcResponse;
|
||||
use crate::shared_client::SharedClient;
|
||||
use crate::{JsonRpcError, LightClientRpcError};
|
||||
use futures::{stream::StreamExt, FutureExt};
|
||||
use serde_json::value::RawValue;
|
||||
use smoldot_light::platform::PlatformRef;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
use crate::client::AddedChain;
|
||||
const LOG_TARGET: &str = "subxt-light-client-background-task";
|
||||
|
||||
use super::LightClientRpcError;
|
||||
use smoldot_light::ChainId;
|
||||
|
||||
const LOG_TARGET: &str = "subxt-light-client-background";
|
||||
|
||||
/// The response of an RPC method.
|
||||
/// Response from [`BackgroundTaskHandle::request()`].
|
||||
pub type MethodResponse = Result<Box<RawValue>, LightClientRpcError>;
|
||||
|
||||
/// Response from [`BackgroundTaskHandle::subscribe()`].
|
||||
pub type SubscriptionResponse = Result<
|
||||
(
|
||||
SubscriptionId,
|
||||
mpsc::UnboundedReceiver<Result<Box<RawValue>, JsonRpcError>>,
|
||||
),
|
||||
LightClientRpcError,
|
||||
>;
|
||||
|
||||
/// Type of subscription IDs we can get back.
|
||||
pub type SubscriptionId = String;
|
||||
|
||||
/// Message protocol between the front-end client that submits the RPC requests
|
||||
/// and the backend handler that produces responses from the chain.
|
||||
///
|
||||
/// The light client uses a single object [`smoldot_light::JsonRpcResponses`] to
|
||||
/// handle all requests and subscriptions from a chain. A background task is spawned
|
||||
/// to multiplex the rpc responses and to provide them back to their rightful submitters.
|
||||
/// and the background task which fetches responses from Smoldot. Hidden behind
|
||||
/// the [`BackgroundTaskHandle`].
|
||||
#[derive(Debug)]
|
||||
pub enum FromSubxt {
|
||||
enum Message {
|
||||
/// The RPC method request.
|
||||
Request {
|
||||
/// The method of the request.
|
||||
method: String,
|
||||
/// The parameters of the request.
|
||||
params: String,
|
||||
/// Channel used to send back the result.
|
||||
params: Option<Box<RawValue>>,
|
||||
/// Channel used to send back the method response.
|
||||
sender: oneshot::Sender<MethodResponse>,
|
||||
/// The ID of the chain used to identify the chain.
|
||||
chain_id: ChainId,
|
||||
},
|
||||
/// The RPC subscription (pub/sub) request.
|
||||
Subscription {
|
||||
@@ -46,37 +50,175 @@ pub enum FromSubxt {
|
||||
/// The method to unsubscribe.
|
||||
unsubscribe_method: String,
|
||||
/// The parameters of the request.
|
||||
params: String,
|
||||
/// Channel used to send back the subscription ID if successful.
|
||||
sub_id: oneshot::Sender<MethodResponse>,
|
||||
/// Channel used to send back the notifications.
|
||||
sender: mpsc::UnboundedSender<Box<RawValue>>,
|
||||
/// The ID of the chain used to identify the chain.
|
||||
chain_id: ChainId,
|
||||
params: Option<Box<RawValue>>,
|
||||
/// Channel used to send back the subscription response.
|
||||
sender: oneshot::Sender<SubscriptionResponse>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Background task data.
|
||||
/// A handle to communicate with the background task.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BackgroundTaskHandle {
|
||||
to_backend: mpsc::UnboundedSender<Message>,
|
||||
}
|
||||
|
||||
impl BackgroundTaskHandle {
|
||||
/// Make an RPC request via the background task.
|
||||
pub async fn request(&self, method: String, params: Option<Box<RawValue>>) -> MethodResponse {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_backend
|
||||
.send(Message::Request {
|
||||
method,
|
||||
params,
|
||||
sender: tx,
|
||||
})
|
||||
.map_err(|_e| LightClientRpcError::BackgroundTaskDropped)?;
|
||||
|
||||
match rx.await {
|
||||
Err(_e) => Err(LightClientRpcError::BackgroundTaskDropped),
|
||||
Ok(response) => response,
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribe to some RPC method via the background task.
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
unsubscribe_method: String,
|
||||
) -> SubscriptionResponse {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_backend
|
||||
.send(Message::Subscription {
|
||||
method,
|
||||
params,
|
||||
unsubscribe_method,
|
||||
sender: tx,
|
||||
})
|
||||
.map_err(|_e| LightClientRpcError::BackgroundTaskDropped)?;
|
||||
|
||||
match rx.await {
|
||||
Err(_e) => Err(LightClientRpcError::BackgroundTaskDropped),
|
||||
Ok(response) => response,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A background task which runs with [`BackgroundTask::run()`] and manages messages
|
||||
/// coming to/from Smoldot.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
|
||||
/// Smoldot light client implementation that leverages the exposed platform.
|
||||
client: smoldot_light::Client<TPlatform, TChain>,
|
||||
/// Generates an unique monotonically increasing ID for each chain.
|
||||
request_id_per_chain: HashMap<smoldot_light::ChainId, usize>,
|
||||
channels: BackgroundTaskChannels,
|
||||
data: BackgroundTaskData<TPlatform, TChain>,
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
/// Constructs a new [`BackgroundTask`].
|
||||
pub(crate) fn new(
|
||||
client: SharedClient<TPlatform, TChain>,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
from_back: smoldot_light::JsonRpcResponses,
|
||||
) -> (BackgroundTask<TPlatform, TChain>, BackgroundTaskHandle) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let bg_task = BackgroundTask {
|
||||
channels: BackgroundTaskChannels {
|
||||
from_front: UnboundedReceiverStream::new(rx),
|
||||
from_back,
|
||||
},
|
||||
data: BackgroundTaskData {
|
||||
client,
|
||||
chain_id,
|
||||
last_request_id: 0,
|
||||
pending_subscriptions: HashMap::new(),
|
||||
requests: HashMap::new(),
|
||||
subscriptions: HashMap::new(),
|
||||
},
|
||||
};
|
||||
|
||||
let bg_handle = BackgroundTaskHandle { to_backend: tx };
|
||||
|
||||
(bg_task, bg_handle)
|
||||
}
|
||||
|
||||
/// Run the background task, which:
|
||||
/// - Forwards messages/subscription requests to Smoldot from the front end.
|
||||
/// - Forwards responses back from Smoldot to the front end.
|
||||
pub async fn run(self) {
|
||||
let chain_id = self.data.chain_id;
|
||||
let mut channels = self.channels;
|
||||
let mut data = self.data;
|
||||
|
||||
loop {
|
||||
tokio::pin! {
|
||||
let from_front_fut = channels.from_front.next().fuse();
|
||||
let from_back_fut = channels.from_back.next().fuse();
|
||||
}
|
||||
|
||||
futures::select! {
|
||||
// Message coming from the front end/client.
|
||||
front_message = from_front_fut => {
|
||||
let Some(message) = front_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Subxt channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received register message {:?}",
|
||||
message
|
||||
);
|
||||
|
||||
data.handle_requests(message).await;
|
||||
},
|
||||
// Message coming from Smoldot.
|
||||
back_message = from_back_fut => {
|
||||
let Some(back_message) = back_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received smoldot RPC chain {:?} result {:?}",
|
||||
chain_id, back_message
|
||||
);
|
||||
|
||||
data.handle_rpc_response(back_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Task closed");
|
||||
}
|
||||
}
|
||||
|
||||
struct BackgroundTaskChannels {
|
||||
/// Messages sent into this background task from the front end.
|
||||
from_front: UnboundedReceiverStream<Message>,
|
||||
/// Messages sent into the background task from Smoldot.
|
||||
from_back: smoldot_light::JsonRpcResponses,
|
||||
}
|
||||
|
||||
struct BackgroundTaskData<TPlatform: PlatformRef, TChain> {
|
||||
/// A smoldot light client that can be shared.
|
||||
client: SharedClient<TPlatform, TChain>,
|
||||
/// Knowing the chain ID helps with debugging, but isn't overwise necessary.
|
||||
chain_id: smoldot_light::ChainId,
|
||||
/// Know which Id to use next for new requests/subscriptions.
|
||||
last_request_id: usize,
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
requests: HashMap<(usize, smoldot_light::ChainId), oneshot::Sender<MethodResponse>>,
|
||||
requests: HashMap<usize, oneshot::Sender<MethodResponse>>,
|
||||
/// Subscription calls first need to make a plain RPC method
|
||||
/// request to obtain the subscription ID.
|
||||
///
|
||||
/// The RPC method request is made in the background and the response should
|
||||
/// not be sent back to the user.
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>,
|
||||
pending_subscriptions: HashMap<usize, PendingSubscription>,
|
||||
/// Map the subscription ID to the frontend `Sender`.
|
||||
///
|
||||
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
|
||||
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
|
||||
subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>,
|
||||
subscriptions: HashMap<String, ActiveSubscription>,
|
||||
}
|
||||
|
||||
/// The state needed to resolve the subscription ID and send
|
||||
@@ -85,66 +227,49 @@ struct PendingSubscription {
|
||||
/// Send the method response ID back to the user.
|
||||
///
|
||||
/// It contains the subscription ID if successful, or an JSON RPC error object.
|
||||
sub_id_sender: oneshot::Sender<MethodResponse>,
|
||||
/// The subscription state that is added to the `subscriptions` map only
|
||||
/// if the subscription ID is successfully sent back to the user.
|
||||
subscription_state: ActiveSubscription,
|
||||
}
|
||||
|
||||
impl PendingSubscription {
|
||||
/// Transforms the pending subscription into an active subscription.
|
||||
fn into_parts(self) -> (oneshot::Sender<MethodResponse>, ActiveSubscription) {
|
||||
(self.sub_id_sender, self.subscription_state)
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the subscription.
|
||||
struct ActiveSubscription {
|
||||
/// Channel to send the subscription notifications back to frontend.
|
||||
sender: mpsc::UnboundedSender<Box<RawValue>>,
|
||||
response_sender: oneshot::Sender<SubscriptionResponse>,
|
||||
/// The unsubscribe method to call when the user drops the receiver
|
||||
/// part of the channel.
|
||||
unsubscribe_method: String,
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
/// Constructs a new [`BackgroundTask`].
|
||||
pub fn new(
|
||||
client: smoldot_light::Client<TPlatform, TChain>,
|
||||
) -> BackgroundTask<TPlatform, TChain> {
|
||||
BackgroundTask {
|
||||
client,
|
||||
request_id_per_chain: Default::default(),
|
||||
requests: Default::default(),
|
||||
id_to_subscription: Default::default(),
|
||||
subscriptions: Default::default(),
|
||||
}
|
||||
}
|
||||
/// The state of the subscription.
|
||||
struct ActiveSubscription {
|
||||
/// Channel to send the subscription notifications back to frontend.
|
||||
notification_sender: mpsc::UnboundedSender<Result<Box<RawValue>, JsonRpcError>>,
|
||||
/// The unsubscribe method to call when the user drops the receiver
|
||||
/// part of the channel.
|
||||
unsubscribe_method: String,
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTaskData<TPlatform, TChain> {
|
||||
/// Fetch and increment the request ID.
|
||||
fn next_id(&mut self, chain_id: smoldot_light::ChainId) -> usize {
|
||||
let next = self.request_id_per_chain.entry(chain_id).or_insert(1);
|
||||
let id = *next;
|
||||
*next = next.wrapping_add(1);
|
||||
id
|
||||
fn next_id(&mut self) -> usize {
|
||||
self.last_request_id = self.last_request_id.wrapping_add(1);
|
||||
self.last_request_id
|
||||
}
|
||||
|
||||
/// Handle the registration messages received from the user.
|
||||
async fn handle_requests(&mut self, message: FromSubxt) {
|
||||
async fn handle_requests(&mut self, message: Message) {
|
||||
match message {
|
||||
FromSubxt::Request {
|
||||
Message::Request {
|
||||
method,
|
||||
params,
|
||||
sender,
|
||||
chain_id,
|
||||
} => {
|
||||
let id = self.next_id(chain_id);
|
||||
let id = self.next_id();
|
||||
let chain_id = self.chain_id;
|
||||
|
||||
let params = match ¶ms {
|
||||
Some(params) => params.get(),
|
||||
None => "null",
|
||||
};
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
|
||||
id, method, params
|
||||
);
|
||||
|
||||
self.requests.insert((id, chain_id), sender);
|
||||
self.requests.insert(id, sender);
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking request id={id} chain={chain_id:?}");
|
||||
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
@@ -157,12 +282,12 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
|
||||
let sender = self
|
||||
.requests
|
||||
.remove(&(id, chain_id))
|
||||
.remove(&id)
|
||||
.expect("Channel is inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
if sender
|
||||
.send(Err(LightClientRpcError::Request(err.to_string())))
|
||||
.send(Err(LightClientRpcError::SmoldotError(err.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -174,32 +299,32 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
tracing::trace!(target: LOG_TARGET, "Submitted to smoldot request with id={id}");
|
||||
}
|
||||
}
|
||||
FromSubxt::Subscription {
|
||||
Message::Subscription {
|
||||
method,
|
||||
unsubscribe_method,
|
||||
params,
|
||||
sub_id,
|
||||
sender,
|
||||
chain_id,
|
||||
} => {
|
||||
let id = self.next_id();
|
||||
let chain_id = self.chain_id;
|
||||
|
||||
// For subscriptions we need to make a plain RPC request to the subscription method.
|
||||
// The server will return as a result the subscription ID.
|
||||
let id = self.next_id(chain_id);
|
||||
let params = match ¶ms {
|
||||
Some(params) => params.get(),
|
||||
None => "null",
|
||||
};
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
|
||||
id, method, params
|
||||
);
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}");
|
||||
let subscription_id_state = PendingSubscription {
|
||||
sub_id_sender: sub_id,
|
||||
subscription_state: ActiveSubscription {
|
||||
sender,
|
||||
unsubscribe_method,
|
||||
},
|
||||
let pending_subscription = PendingSubscription {
|
||||
response_sender: sender,
|
||||
unsubscribe_method,
|
||||
};
|
||||
self.id_to_subscription
|
||||
.insert((id, chain_id), subscription_id_state);
|
||||
self.pending_subscriptions.insert(id, pending_subscription);
|
||||
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
@@ -209,14 +334,14 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
err.to_string()
|
||||
);
|
||||
let subscription_id_state = self
|
||||
.id_to_subscription
|
||||
.remove(&(id, chain_id))
|
||||
.pending_subscriptions
|
||||
.remove(&id)
|
||||
.expect("Channels are inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
if subscription_id_state
|
||||
.sub_id_sender
|
||||
.send(Err(LightClientRpcError::Request(err.to_string())))
|
||||
.response_sender
|
||||
.send(Err(LightClientRpcError::SmoldotError(err.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -232,19 +357,75 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
|
||||
/// Parse the response received from the light client and sent it to the appropriate user.
|
||||
fn handle_rpc_response(&mut self, chain_id: smoldot_light::ChainId, response: String) {
|
||||
tracing::trace!(target: LOG_TARGET, "Received from smoldot response={response} chain={chain_id:?}");
|
||||
fn handle_rpc_response(&mut self, response: String) {
|
||||
let chain_id = self.chain_id;
|
||||
tracing::trace!(target: LOG_TARGET, "Received from smoldot response='{response}' chain={chain_id:?}");
|
||||
|
||||
match RpcResponse::from_str(&response) {
|
||||
Ok(RpcResponse::Error { id, error }) => {
|
||||
Ok(RpcResponse::Method { id, result }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
// Send the response back.
|
||||
if let Some(sender) = self.requests.remove(&id) {
|
||||
if sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(pending_subscription) = self.pending_subscriptions.remove(&id) {
|
||||
let Ok(sub_id) = serde_json::from_str::<SubscriptionId>(result.get()) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription id='{result}' chain={chain_id:?} is not a valid string",
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
|
||||
|
||||
let (sub_tx, sub_rx) = mpsc::unbounded_channel();
|
||||
|
||||
// Send the method response and a channel to receive notifications back.
|
||||
if pending_subscription
|
||||
.response_sender
|
||||
.send(Ok((sub_id.clone(), sub_rx)))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send subscription ID response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Store the other end of the notif channel to send future subscription notifications to.
|
||||
self.subscriptions.insert(
|
||||
sub_id,
|
||||
ActiveSubscription {
|
||||
notification_sender: sub_tx,
|
||||
unsubscribe_method: pending_subscription.unsubscribe_method,
|
||||
},
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Response id={id} chain={chain_id:?} is not tracked",
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::MethodError { id, error }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send error. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(sender) = self.requests.remove(&(id, chain_id)) {
|
||||
if let Some(sender) = self.requests.remove(&id) {
|
||||
if sender
|
||||
.send(Err(LightClientRpcError::Request(error.to_string())))
|
||||
.send(Err(LightClientRpcError::JsonRpcError(JsonRpcError(error))))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -252,12 +433,10 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(subscription_id_state) =
|
||||
self.id_to_subscription.remove(&(id, chain_id))
|
||||
{
|
||||
} else if let Some(subscription_id_state) = self.pending_subscriptions.remove(&id) {
|
||||
if subscription_id_state
|
||||
.sub_id_sender
|
||||
.send(Err(LightClientRpcError::Request(error.to_string())))
|
||||
.response_sender
|
||||
.send(Err(LightClientRpcError::JsonRpcError(JsonRpcError(error))))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -267,95 +446,44 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::Method { id, result }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
// Send the response back.
|
||||
if let Some(sender) = self.requests.remove(&(id, chain_id)) {
|
||||
if sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(pending_subscription) =
|
||||
self.id_to_subscription.remove(&(id, chain_id))
|
||||
{
|
||||
let Ok(sub_id) = result
|
||||
.get()
|
||||
.trim_start_matches('"')
|
||||
.trim_end_matches('"')
|
||||
.parse::<usize>()
|
||||
else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription id={result} chain={chain_id:?} is not a valid number",
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
|
||||
|
||||
let (sub_id_sender, active_subscription) = pending_subscription.into_parts();
|
||||
if sub_id_sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Track this subscription ID if send is successful.
|
||||
self.subscriptions
|
||||
.insert((sub_id, chain_id), active_subscription);
|
||||
} else {
|
||||
Ok(RpcResponse::Notification {
|
||||
method,
|
||||
subscription_id,
|
||||
result,
|
||||
}) => {
|
||||
let Some(active_subscription) = self.subscriptions.get_mut(&subscription_id) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Response id={id} chain={chain_id:?} is not tracked",
|
||||
"Subscription response id={subscription_id} chain={chain_id:?} method={method} is not tracked",
|
||||
);
|
||||
return;
|
||||
};
|
||||
if active_subscription
|
||||
.notification_sender
|
||||
.send(Ok(result))
|
||||
.is_err()
|
||||
{
|
||||
self.unsubscribe(&subscription_id, chain_id);
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::Subscription { method, id, result }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send subscription. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else {
|
||||
Ok(RpcResponse::NotificationError {
|
||||
method,
|
||||
subscription_id,
|
||||
error,
|
||||
}) => {
|
||||
let Some(active_subscription) = self.subscriptions.get_mut(&subscription_id) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription response id={id} chain={chain_id:?} method={method} is not tracked",
|
||||
"Subscription error id={subscription_id} chain={chain_id:?} method={method} is not tracked",
|
||||
);
|
||||
return;
|
||||
};
|
||||
if subscription_state.sender.send(result).is_ok() {
|
||||
// Nothing else to do, user is informed about the notification.
|
||||
return;
|
||||
}
|
||||
|
||||
// User dropped the receiver, unsubscribe from the method and remove internal tracking.
|
||||
let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else {
|
||||
// State is checked to be some above, so this should never happen.
|
||||
return;
|
||||
};
|
||||
// Make a call to unsubscribe from this method.
|
||||
let unsub_id = self.next_id(chain_id);
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
|
||||
unsub_id, subscription_state.unsubscribe_method, id
|
||||
);
|
||||
|
||||
if let Err(err) = self.client.json_rpc_request(request, chain_id) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method);
|
||||
if active_subscription
|
||||
.notification_sender
|
||||
.send(Err(JsonRpcError(error)))
|
||||
.is_err()
|
||||
{
|
||||
self.unsubscribe(&subscription_id, chain_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -364,169 +492,28 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform the main background task:
|
||||
/// - receiving requests from subxt RPC method / subscriptions
|
||||
/// - provides the results from the light client back to users.
|
||||
pub async fn start_task<TPlat: smoldot_light::platform::PlatformRef>(
|
||||
&mut self,
|
||||
from_subxt: mpsc::UnboundedReceiver<FromSubxt>,
|
||||
from_node: Vec<AddedChain<TPlat>>,
|
||||
) {
|
||||
let from_subxt_event = tokio_stream::wrappers::UnboundedReceiverStream::new(from_subxt);
|
||||
// Unsubscribe from a subscription.
|
||||
fn unsubscribe(&mut self, subscription_id: &str, chain_id: smoldot_light::ChainId) {
|
||||
let Some(active_subscription) = self.subscriptions.remove(subscription_id) else {
|
||||
// Subscription doesn't exist so nothing more to do.
|
||||
return;
|
||||
};
|
||||
|
||||
let from_node = from_node.into_iter().map(|rpc| {
|
||||
Box::pin(futures::stream::unfold(rpc, |mut rpc| async move {
|
||||
let response = rpc.rpc_responses.next().await;
|
||||
Some(((response, rpc.chain_id), rpc))
|
||||
}))
|
||||
});
|
||||
let stream_combinator = futures::stream::select_all(from_node);
|
||||
// Build a call to unsubscribe from this method.
|
||||
let unsub_id = self.next_id();
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
|
||||
unsub_id, active_subscription.unsubscribe_method, subscription_id
|
||||
);
|
||||
|
||||
tokio::pin!(from_subxt_event, stream_combinator);
|
||||
|
||||
let mut from_subxt_event_fut = from_subxt_event.next();
|
||||
let mut from_node_event_fut = stream_combinator.next();
|
||||
|
||||
loop {
|
||||
match future::select(from_subxt_event_fut, from_node_event_fut).await {
|
||||
// Message received from subxt.
|
||||
Either::Left((subxt_message, previous_fut)) => {
|
||||
let Some(message) = subxt_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Subxt channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received register message {:?}",
|
||||
message
|
||||
);
|
||||
|
||||
self.handle_requests(message).await;
|
||||
|
||||
from_subxt_event_fut = from_subxt_event.next();
|
||||
from_node_event_fut = previous_fut;
|
||||
}
|
||||
// Message received from rpc handler: lightclient response.
|
||||
Either::Right((node_message, previous_fut)) => {
|
||||
let Some((node_message, chain)) = node_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Smoldot closed all RPC channels");
|
||||
break;
|
||||
};
|
||||
// Smoldot returns `None` if the chain has been removed (which subxt does not remove).
|
||||
let Some(response) = node_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received smoldot RPC chain {:?} result {:?}",
|
||||
chain, response
|
||||
);
|
||||
|
||||
self.handle_rpc_response(chain, response);
|
||||
|
||||
// Advance backend, save frontend.
|
||||
from_subxt_event_fut = previous_fut;
|
||||
from_node_event_fut = stream_combinator.next();
|
||||
}
|
||||
}
|
||||
// Submit it.
|
||||
if let Err(err) = self.client.json_rpc_request(request, chain_id) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to unsubscribe id={subscription_id} chain={chain_id:?} method={:?} err={err:?}", active_subscription.unsubscribe_method
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={subscription_id} chain={chain_id:?} method={:?}", active_subscription.unsubscribe_method);
|
||||
}
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Task closed");
|
||||
}
|
||||
}
|
||||
|
||||
/// The RPC response from the light-client.
|
||||
/// This can either be a response of a method, or a notification from a subscription.
|
||||
#[derive(Debug, Clone)]
|
||||
enum RpcResponse {
|
||||
Method {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// The result of the method call.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
Subscription {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
id: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
Error {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// Error.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
}
|
||||
|
||||
impl std::str::FromStr for RpcResponse {
|
||||
type Err = serde_json::Error;
|
||||
|
||||
fn from_str(response: &str) -> Result<Self, Self::Err> {
|
||||
// Helper structures to deserialize from raw RPC strings.
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Response {
|
||||
/// JSON-RPC version.
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
/// Request ID
|
||||
id: String,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationParams {
|
||||
/// The ID of the subscription.
|
||||
subscription: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct ResponseNotification {
|
||||
/// JSON-RPC version.
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Result.
|
||||
params: NotificationParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct ErrorResponse {
|
||||
/// JSON-RPC version.
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
/// Request ID.
|
||||
id: String,
|
||||
/// Error.
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Check if the response can be mapped as an RPC method response.
|
||||
let result: Result<Response, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Method {
|
||||
id: response.id,
|
||||
result: response.result,
|
||||
});
|
||||
}
|
||||
|
||||
let result: Result<ResponseNotification, _> = serde_json::from_str(response);
|
||||
if let Ok(notification) = result {
|
||||
return Ok(RpcResponse::Subscription {
|
||||
id: notification.params.subscription,
|
||||
method: notification.method,
|
||||
result: notification.params.result,
|
||||
});
|
||||
}
|
||||
|
||||
let error: ErrorResponse = serde_json::from_str(response)?;
|
||||
Ok(RpcResponse::Error {
|
||||
id: error.id,
|
||||
error: error.error,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use serde_json::Value;
|
||||
use std::borrow::Cow;
|
||||
|
||||
/// Something went wrong building chain config.
|
||||
#[non_exhaustive]
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ChainConfigError {
|
||||
/// The provided chain spec is the wrong shape.
|
||||
#[error("Invalid chain spec format")]
|
||||
InvalidSpecFormat,
|
||||
}
|
||||
|
||||
/// Configuration to connect to a chain.
|
||||
pub struct ChainConfig<'a> {
|
||||
// The chain spec to use.
|
||||
chain_spec: Cow<'a, str>,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a str> for ChainConfig<'a> {
|
||||
fn from(chain_spec: &'a str) -> Self {
|
||||
ChainConfig::chain_spec(chain_spec)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<String> for ChainConfig<'a> {
|
||||
fn from(chain_spec: String) -> Self {
|
||||
ChainConfig::chain_spec(chain_spec)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ChainConfig<'a> {
|
||||
/// Construct a chain config from a chain spec.
|
||||
pub fn chain_spec(chain_spec: impl Into<Cow<'a, str>>) -> Self {
|
||||
ChainConfig {
|
||||
chain_spec: chain_spec.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the bootnodes to the given ones.
|
||||
pub fn set_bootnodes<S: AsRef<str>>(
|
||||
self,
|
||||
bootnodes: impl IntoIterator<Item = S>,
|
||||
) -> Result<Self, ChainConfigError> {
|
||||
let mut chain_spec_json: Value = serde_json::from_str(&self.chain_spec)
|
||||
.map_err(|_e| ChainConfigError::InvalidSpecFormat)?;
|
||||
|
||||
if let Value::Object(map) = &mut chain_spec_json {
|
||||
let bootnodes = bootnodes
|
||||
.into_iter()
|
||||
.map(|s| Value::String(s.as_ref().to_owned()))
|
||||
.collect();
|
||||
|
||||
map.insert("bootNodes".to_string(), Value::Array(bootnodes));
|
||||
} else {
|
||||
return Err(ChainConfigError::InvalidSpecFormat);
|
||||
}
|
||||
|
||||
Ok(ChainConfig {
|
||||
chain_spec: Cow::Owned(chain_spec_json.to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
// Used internally to fetch the chain spec back out.
|
||||
pub(crate) fn as_chain_spec(&self) -> &str {
|
||||
&self.chain_spec
|
||||
}
|
||||
}
|
||||
+239
-32
@@ -2,52 +2,259 @@
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
//! Low level light client implementation for RPC method and
|
||||
//! subscriptions requests.
|
||||
//!
|
||||
//! The client implementation supports both native and wasm
|
||||
//! environments.
|
||||
//!
|
||||
//! This leverages the smoldot crate to connect to the chain.
|
||||
//! A wrapper around [`smoldot_light`] which provides an light client capable of connecting
|
||||
//! to Substrate based chains.
|
||||
|
||||
#![deny(missing_docs)]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
|
||||
#[cfg(any(
|
||||
all(feature = "web", feature = "native"),
|
||||
not(any(feature = "web", feature = "native"))
|
||||
))]
|
||||
compile_error!("subxt: exactly one of the 'web' and 'native' features should be used.");
|
||||
compile_error!("subxt-lightclient: exactly one of the 'web' and 'native' features should be used.");
|
||||
|
||||
mod background;
|
||||
mod client;
|
||||
mod platform;
|
||||
mod shared_client;
|
||||
// mod receiver;
|
||||
mod background;
|
||||
mod chain_config;
|
||||
mod rpc;
|
||||
|
||||
// Used to enable the js feature for wasm.
|
||||
#[cfg(feature = "web")]
|
||||
#[allow(unused_imports)]
|
||||
pub use getrandom as _;
|
||||
use background::{BackgroundTask, BackgroundTaskHandle};
|
||||
use futures::Stream;
|
||||
use platform::DefaultPlatform;
|
||||
use serde_json::value::RawValue;
|
||||
use shared_client::SharedClient;
|
||||
use std::future::Future;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub use client::{AddedChain, LightClientRpc, RawLightClientRpc};
|
||||
pub use chain_config::{ChainConfig, ChainConfigError};
|
||||
|
||||
/// Re-exports of the smoldot related objects.
|
||||
pub mod smoldot {
|
||||
pub use smoldot_light::{
|
||||
platform::PlatformRef, AddChainConfig, AddChainConfigJsonRpc, ChainId, Client,
|
||||
JsonRpcResponses,
|
||||
};
|
||||
|
||||
#[cfg(feature = "native")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
|
||||
pub use smoldot_light::platform::default::DefaultPlatform;
|
||||
}
|
||||
|
||||
/// Light client error.
|
||||
/// Things that can go wrong when constructing the [`LightClient`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum LightClientRpcError {
|
||||
pub enum LightClientError {
|
||||
/// Error encountered while adding the chain to the light-client.
|
||||
#[error("Failed to add the chain to the light client: {0}.")]
|
||||
AddChainError(String),
|
||||
/// Error originated while trying to submit a RPC request.
|
||||
#[error("RPC request cannot be sent: {0}.")]
|
||||
Request(String),
|
||||
}
|
||||
|
||||
/// Things that can go wrong calling methods of [`LightClientRpc`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum LightClientRpcError {
|
||||
/// Error response from the JSON-RPC server.
|
||||
#[error("{0}")]
|
||||
JsonRpcError(JsonRpcError),
|
||||
/// Smoldot could not handle the RPC call.
|
||||
#[error("Smoldot could not handle the RPC call: {0}.")]
|
||||
SmoldotError(String),
|
||||
/// Background task dropped.
|
||||
#[error("The background task was dropped.")]
|
||||
BackgroundTaskDropped,
|
||||
}
|
||||
|
||||
/// An error response from the JSON-RPC server (ie smoldot) in response to
|
||||
/// a method call or as a subscription notification.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("RPC Error: {0}.")]
|
||||
pub struct JsonRpcError(Box<RawValue>);
|
||||
|
||||
/// This represents a single light client connection to the network. Instantiate
|
||||
/// it with [`LightClient::relay_chain()`] to communicate with a relay chain, and
|
||||
/// then call [`LightClient::parachain()`] to establish connections to parachains.
|
||||
#[derive(Clone)]
|
||||
pub struct LightClient {
|
||||
client: SharedClient<DefaultPlatform>,
|
||||
relay_chain_id: smoldot_light::ChainId,
|
||||
}
|
||||
|
||||
impl LightClient {
|
||||
/// Given a chain spec, establish a connection to a relay chain. Any subsequent calls to
|
||||
/// [`LightClient::parachain()`] will set this as the relay chain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ## Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ## Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html.
|
||||
pub fn relay_chain<'a>(
|
||||
chain_config: impl Into<ChainConfig<'a>>,
|
||||
) -> Result<(Self, LightClientRpc), LightClientError> {
|
||||
let mut client = smoldot_light::Client::new(platform::build_platform());
|
||||
let chain_config = chain_config.into();
|
||||
let chain_spec = chain_config.as_chain_spec();
|
||||
|
||||
let config = smoldot_light::AddChainConfig {
|
||||
specification: chain_spec,
|
||||
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
|
||||
max_pending_requests: u32::MAX.try_into().unwrap(),
|
||||
max_subscriptions: u32::MAX,
|
||||
},
|
||||
database_content: "",
|
||||
potential_relay_chains: std::iter::empty(),
|
||||
user_data: (),
|
||||
};
|
||||
|
||||
let added_chain = client
|
||||
.add_chain(config)
|
||||
.map_err(|err| LightClientError::AddChainError(err.to_string()))?;
|
||||
|
||||
let relay_chain_id = added_chain.chain_id;
|
||||
let rpc_responses = added_chain
|
||||
.json_rpc_responses
|
||||
.expect("Light client RPC configured; qed");
|
||||
let shared_client: SharedClient<_> = client.into();
|
||||
|
||||
let light_client_rpc =
|
||||
LightClientRpc::new_raw(shared_client.clone(), relay_chain_id, rpc_responses);
|
||||
let light_client = Self {
|
||||
client: shared_client,
|
||||
relay_chain_id,
|
||||
};
|
||||
|
||||
Ok((light_client, light_client_rpc))
|
||||
}
|
||||
|
||||
/// Given a chain spec, establish a connection to a parachain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ## Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ## Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html.
|
||||
pub fn parachain<'a>(
|
||||
&self,
|
||||
chain_config: impl Into<ChainConfig<'a>>,
|
||||
) -> Result<LightClientRpc, LightClientError> {
|
||||
let chain_config = chain_config.into();
|
||||
let chain_spec = chain_config.as_chain_spec();
|
||||
|
||||
let config = smoldot_light::AddChainConfig {
|
||||
specification: chain_spec,
|
||||
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
|
||||
max_pending_requests: u32::MAX.try_into().unwrap(),
|
||||
max_subscriptions: u32::MAX,
|
||||
},
|
||||
database_content: "",
|
||||
potential_relay_chains: std::iter::once(self.relay_chain_id),
|
||||
user_data: (),
|
||||
};
|
||||
|
||||
let added_chain = self
|
||||
.client
|
||||
.add_chain(config)
|
||||
.map_err(|err| LightClientError::AddChainError(err.to_string()))?;
|
||||
|
||||
let chain_id = added_chain.chain_id;
|
||||
let rpc_responses = added_chain
|
||||
.json_rpc_responses
|
||||
.expect("Light client RPC configured; qed");
|
||||
|
||||
Ok(LightClientRpc::new_raw(
|
||||
self.client.clone(),
|
||||
chain_id,
|
||||
rpc_responses,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// This represents a single RPC connection to a specific chain, and is constructed by calling
|
||||
/// one of the methods on [`LightClient`]. Using this, you can make RPC requests to the chain.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LightClientRpc {
|
||||
handle: BackgroundTaskHandle,
|
||||
}
|
||||
|
||||
impl LightClientRpc {
|
||||
// Dev note: this would provide a "low leveL" interface if one is needed.
|
||||
// Do we actually need to provide this, or can we entirely hide Smoldot?
|
||||
pub(crate) fn new_raw<TPlat, TChain>(
|
||||
client: impl Into<SharedClient<TPlat, TChain>>,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
rpc_responses: smoldot_light::JsonRpcResponses,
|
||||
) -> Self
|
||||
where
|
||||
TPlat: smoldot_light::platform::PlatformRef + Send + 'static,
|
||||
TChain: Send + 'static,
|
||||
{
|
||||
let (background_task, background_handle) =
|
||||
BackgroundTask::new(client.into(), chain_id, rpc_responses);
|
||||
|
||||
// For now we spawn the background task internally, but later we can expose
|
||||
// methods to give this back to the user so that they can exert backpressure.
|
||||
spawn(async move { background_task.run().await });
|
||||
|
||||
LightClientRpc {
|
||||
handle: background_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Make an RPC request to a chain, getting back a result.
|
||||
pub async fn request(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
) -> Result<Box<RawValue>, LightClientRpcError> {
|
||||
self.handle.request(method, params).await
|
||||
}
|
||||
|
||||
/// Subscribe to some RPC method, getting back a stream of notifications.
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
unsub: String,
|
||||
) -> Result<LightClientRpcSubscription, LightClientRpcError> {
|
||||
let (id, notifications) = self.handle.subscribe(method, params, unsub).await?;
|
||||
Ok(LightClientRpcSubscription { id, notifications })
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream of notifications handed back when [`LightClientRpc::subscribe`] is called.
|
||||
pub struct LightClientRpcSubscription {
|
||||
notifications: mpsc::UnboundedReceiver<Result<Box<RawValue>, JsonRpcError>>,
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl LightClientRpcSubscription {
|
||||
/// Return the subscription ID
|
||||
pub fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for LightClientRpcSubscription {
|
||||
type Item = Result<Box<RawValue>, JsonRpcError>;
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.notifications.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// A quick helper to spawn a task that works for WASM.
|
||||
fn spawn<F: Future + Send + 'static>(future: F) {
|
||||
#[cfg(feature = "native")]
|
||||
tokio::spawn(async move {
|
||||
future.await;
|
||||
});
|
||||
#[cfg(feature = "web")]
|
||||
wasm_bindgen_futures::spawn_local(async move {
|
||||
future.await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -11,16 +11,16 @@ mod wasm_platform;
|
||||
#[cfg(feature = "web")]
|
||||
mod wasm_socket;
|
||||
|
||||
pub use helpers::build_platform;
|
||||
pub use helpers::{build_platform, DefaultPlatform};
|
||||
|
||||
#[cfg(feature = "native")]
|
||||
mod helpers {
|
||||
use smoldot_light::platform::default::DefaultPlatform as Platform;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub type PlatformType = Arc<Platform>;
|
||||
pub type DefaultPlatform = Arc<Platform>;
|
||||
|
||||
pub fn build_platform() -> PlatformType {
|
||||
pub fn build_platform() -> DefaultPlatform {
|
||||
Platform::new(
|
||||
"subxt-light-client".into(),
|
||||
env!("CARGO_PKG_VERSION").into(),
|
||||
@@ -32,9 +32,9 @@ mod helpers {
|
||||
mod helpers {
|
||||
use super::wasm_platform::SubxtPlatform as Platform;
|
||||
|
||||
pub type PlatformType = Platform;
|
||||
pub type DefaultPlatform = Platform;
|
||||
|
||||
pub fn build_platform() -> PlatformType {
|
||||
pub fn build_platform() -> DefaultPlatform {
|
||||
Platform::new()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,14 +124,14 @@ impl PlatformRef for SubxtPlatform {
|
||||
port,
|
||||
} => {
|
||||
let addr = SocketAddr::from((ip, port));
|
||||
format!("ws://{}", addr.to_string())
|
||||
format!("ws://{}", addr)
|
||||
}
|
||||
Address::WebSocketIp {
|
||||
ip: IpAddr::V6(ip),
|
||||
port,
|
||||
} => {
|
||||
let addr = SocketAddr::from((ip, port));
|
||||
format!("ws://{}", addr.to_string())
|
||||
format!("ws://{}", addr)
|
||||
}
|
||||
|
||||
// The API user of the `PlatformRef` trait is never supposed to open connections of
|
||||
|
||||
@@ -111,7 +111,7 @@ impl WasmSocket {
|
||||
|
||||
let mut inner = inner.lock().expect("Mutex is poised; qed");
|
||||
let bytes = js_sys::Uint8Array::new(&buffer).to_vec();
|
||||
inner.data.extend(bytes.into_iter());
|
||||
inner.data.extend(bytes);
|
||||
|
||||
if let Some(waker) = inner.waker.take() {
|
||||
waker.wake();
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
/// The RPC response from the light-client.
|
||||
/// This can either be a response of a method, or a notification from a subscription.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RpcResponse {
|
||||
Method {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// The result of the method call.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
MethodError {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// Error.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
Notification {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
subscription_id: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
NotificationError {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
subscription_id: String,
|
||||
/// Result.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
}
|
||||
|
||||
impl std::str::FromStr for RpcResponse {
|
||||
type Err = ();
|
||||
|
||||
fn from_str(response: &str) -> Result<Self, Self::Err> {
|
||||
// Valid response
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Response {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
id: String,
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Error response
|
||||
#[derive(Deserialize)]
|
||||
struct ResponseError {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
id: String,
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Valid notification (subscription) response
|
||||
#[derive(Deserialize)]
|
||||
struct Notification {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: NotificationResultParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationResultParams {
|
||||
subscription: String,
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Error notification (subscription) response
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationError {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: NotificationErrorParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationErrorParams {
|
||||
/// The ID of the subscription.
|
||||
subscription: String,
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Try deserializing the response payload to one of the above. We can
|
||||
// do this more efficiently eg how jsonrpsee_types does.
|
||||
|
||||
let result: Result<Response, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Method {
|
||||
id: response.id,
|
||||
result: response.result,
|
||||
});
|
||||
}
|
||||
let result: Result<Notification, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Notification {
|
||||
subscription_id: response.params.subscription,
|
||||
method: response.method,
|
||||
result: response.params.result,
|
||||
});
|
||||
}
|
||||
let result: Result<ResponseError, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::MethodError {
|
||||
id: response.id,
|
||||
error: response.error,
|
||||
});
|
||||
}
|
||||
let result: Result<NotificationError, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::NotificationError {
|
||||
method: response.method,
|
||||
subscription_id: response.params.subscription,
|
||||
error: response.params.error,
|
||||
});
|
||||
}
|
||||
|
||||
// We couldn't decode into any of the above. We could pick one of the above`
|
||||
// errors to return, but there's no real point since the string is obviously
|
||||
// different from any of them.
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use smoldot_light as sl;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// This wraps [`smoldot_light::Client`] so that it can be cloned and shared.
|
||||
#[derive(Clone)]
|
||||
pub struct SharedClient<TPlat: sl::platform::PlatformRef, TChain = ()> {
|
||||
client: Arc<Mutex<sl::Client<TPlat, TChain>>>,
|
||||
}
|
||||
|
||||
impl<TPlat: sl::platform::PlatformRef, TChain> From<sl::Client<TPlat, TChain>>
|
||||
for SharedClient<TPlat, TChain>
|
||||
{
|
||||
fn from(client: sl::Client<TPlat, TChain>) -> Self {
|
||||
SharedClient {
|
||||
client: Arc::new(Mutex::new(client)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TPlat: sl::platform::PlatformRef, TChain> SharedClient<TPlat, TChain> {
|
||||
/// Delegates to [`smoldot_light::Client::json_rpc_request()`].
|
||||
pub(crate) fn json_rpc_request(
|
||||
&self,
|
||||
json_rpc_request: impl Into<String>,
|
||||
chain_id: sl::ChainId,
|
||||
) -> Result<(), sl::HandleRpcError> {
|
||||
self.client
|
||||
.lock()
|
||||
.expect("mutex should not be poisoned")
|
||||
.json_rpc_request(json_rpc_request, chain_id)
|
||||
}
|
||||
|
||||
/// Delegates to [`smoldot_light::Client::add_chain()`].
|
||||
pub(crate) fn add_chain(
|
||||
&self,
|
||||
config: sl::AddChainConfig<'_, TChain, impl Iterator<Item = sl::ChainId>>,
|
||||
) -> Result<sl::AddChainSuccess, sl::AddChainError> {
|
||||
self.client
|
||||
.lock()
|
||||
.expect("mutex should not be poisoned")
|
||||
.add_chain(config)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user