mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-11 20:01:08 +00:00
Rework light client (#1475)
* WIP second pass over light client code for simpler API * First pass new light client * pub(crate) LightClientRpc::new_raw(), and fmt * Update examples and add back a way to configure boot nodes and fetch chainspec from a URL * Fix light client examples * remove unused deps and tidy lightclient feature flags * fix wasm error * LightClientRpc can be cloned * update light client tests * Other small fixes * exclude mod unless jsonrpsee * Fix wasm-lightclient-tests * add back docsrs bit and web+native feature flag compile error * update book and light client example names * fix docs
This commit is contained in:
+330
-363
@@ -1,43 +1,47 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// Copyright 2019-2024 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use futures::stream::StreamExt;
|
||||
use futures_util::future::{self, Either};
|
||||
use serde::Deserialize;
|
||||
use crate::rpc::RpcResponse;
|
||||
use crate::shared_client::SharedClient;
|
||||
use crate::{JsonRpcError, LightClientRpcError};
|
||||
use futures::{stream::StreamExt, FutureExt};
|
||||
use serde_json::value::RawValue;
|
||||
use smoldot_light::platform::PlatformRef;
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
use crate::client::AddedChain;
|
||||
const LOG_TARGET: &str = "subxt-light-client-background-task";
|
||||
|
||||
use super::LightClientRpcError;
|
||||
use smoldot_light::ChainId;
|
||||
|
||||
const LOG_TARGET: &str = "subxt-light-client-background";
|
||||
|
||||
/// The response of an RPC method.
|
||||
/// Response from [`BackgroundTaskHandle::request()`].
|
||||
pub type MethodResponse = Result<Box<RawValue>, LightClientRpcError>;
|
||||
|
||||
/// Response from [`BackgroundTaskHandle::subscribe()`].
|
||||
pub type SubscriptionResponse = Result<
|
||||
(
|
||||
SubscriptionId,
|
||||
mpsc::UnboundedReceiver<Result<Box<RawValue>, JsonRpcError>>,
|
||||
),
|
||||
LightClientRpcError,
|
||||
>;
|
||||
|
||||
/// Type of subscription IDs we can get back.
|
||||
pub type SubscriptionId = String;
|
||||
|
||||
/// Message protocol between the front-end client that submits the RPC requests
|
||||
/// and the backend handler that produces responses from the chain.
|
||||
///
|
||||
/// The light client uses a single object [`smoldot_light::JsonRpcResponses`] to
|
||||
/// handle all requests and subscriptions from a chain. A background task is spawned
|
||||
/// to multiplex the rpc responses and to provide them back to their rightful submitters.
|
||||
/// and the background task which fetches responses from Smoldot. Hidden behind
|
||||
/// the [`BackgroundTaskHandle`].
|
||||
#[derive(Debug)]
|
||||
pub enum FromSubxt {
|
||||
enum Message {
|
||||
/// The RPC method request.
|
||||
Request {
|
||||
/// The method of the request.
|
||||
method: String,
|
||||
/// The parameters of the request.
|
||||
params: String,
|
||||
/// Channel used to send back the result.
|
||||
params: Option<Box<RawValue>>,
|
||||
/// Channel used to send back the method response.
|
||||
sender: oneshot::Sender<MethodResponse>,
|
||||
/// The ID of the chain used to identify the chain.
|
||||
chain_id: ChainId,
|
||||
},
|
||||
/// The RPC subscription (pub/sub) request.
|
||||
Subscription {
|
||||
@@ -46,29 +50,160 @@ pub enum FromSubxt {
|
||||
/// The method to unsubscribe.
|
||||
unsubscribe_method: String,
|
||||
/// The parameters of the request.
|
||||
params: String,
|
||||
/// Channel used to send back the subscription ID if successful.
|
||||
sub_id: oneshot::Sender<MethodResponse>,
|
||||
/// Channel used to send back the notifications.
|
||||
sender: mpsc::UnboundedSender<Box<RawValue>>,
|
||||
/// The ID of the chain used to identify the chain.
|
||||
chain_id: ChainId,
|
||||
params: Option<Box<RawValue>>,
|
||||
/// Channel used to send back the subscription response.
|
||||
sender: oneshot::Sender<SubscriptionResponse>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Background task data.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
|
||||
/// Smoldot light client implementation that leverages the exposed platform.
|
||||
client: smoldot_light::Client<TPlatform, TChain>,
|
||||
/// Per-chain data.
|
||||
chain_data: HashMap<smoldot_light::ChainId, ChainData>,
|
||||
/// A handle to communicate with the background task.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BackgroundTaskHandle {
|
||||
to_backend: mpsc::UnboundedSender<Message>,
|
||||
}
|
||||
|
||||
/// The data that we store for each chain.
|
||||
#[derive(Default)]
|
||||
struct ChainData {
|
||||
/// Generates an unique monotonically increasing ID for each chain.
|
||||
impl BackgroundTaskHandle {
|
||||
/// Make an RPC request via the background task.
|
||||
pub async fn request(&self, method: String, params: Option<Box<RawValue>>) -> MethodResponse {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_backend
|
||||
.send(Message::Request {
|
||||
method,
|
||||
params,
|
||||
sender: tx,
|
||||
})
|
||||
.map_err(|_e| LightClientRpcError::BackgroundTaskDropped)?;
|
||||
|
||||
match rx.await {
|
||||
Err(_e) => Err(LightClientRpcError::BackgroundTaskDropped),
|
||||
Ok(response) => response,
|
||||
}
|
||||
}
|
||||
|
||||
/// Subscribe to some RPC method via the background task.
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
unsubscribe_method: String,
|
||||
) -> SubscriptionResponse {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.to_backend
|
||||
.send(Message::Subscription {
|
||||
method,
|
||||
params,
|
||||
unsubscribe_method,
|
||||
sender: tx,
|
||||
})
|
||||
.map_err(|_e| LightClientRpcError::BackgroundTaskDropped)?;
|
||||
|
||||
match rx.await {
|
||||
Err(_e) => Err(LightClientRpcError::BackgroundTaskDropped),
|
||||
Ok(response) => response,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A background task which runs with [`BackgroundTask::run()`] and manages messages
|
||||
/// coming to/from Smoldot.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
|
||||
channels: BackgroundTaskChannels,
|
||||
data: BackgroundTaskData<TPlatform, TChain>,
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
/// Constructs a new [`BackgroundTask`].
|
||||
pub(crate) fn new(
|
||||
client: SharedClient<TPlatform, TChain>,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
from_back: smoldot_light::JsonRpcResponses,
|
||||
) -> (BackgroundTask<TPlatform, TChain>, BackgroundTaskHandle) {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
||||
let bg_task = BackgroundTask {
|
||||
channels: BackgroundTaskChannels {
|
||||
from_front: UnboundedReceiverStream::new(rx),
|
||||
from_back,
|
||||
},
|
||||
data: BackgroundTaskData {
|
||||
client,
|
||||
chain_id,
|
||||
last_request_id: 0,
|
||||
pending_subscriptions: HashMap::new(),
|
||||
requests: HashMap::new(),
|
||||
subscriptions: HashMap::new(),
|
||||
},
|
||||
};
|
||||
|
||||
let bg_handle = BackgroundTaskHandle { to_backend: tx };
|
||||
|
||||
(bg_task, bg_handle)
|
||||
}
|
||||
|
||||
/// Run the background task, which:
|
||||
/// - Forwards messages/subscription requests to Smoldot from the front end.
|
||||
/// - Forwards responses back from Smoldot to the front end.
|
||||
pub async fn run(self) {
|
||||
let chain_id = self.data.chain_id;
|
||||
let mut channels = self.channels;
|
||||
let mut data = self.data;
|
||||
|
||||
loop {
|
||||
tokio::pin! {
|
||||
let from_front_fut = channels.from_front.next().fuse();
|
||||
let from_back_fut = channels.from_back.next().fuse();
|
||||
}
|
||||
|
||||
futures::select! {
|
||||
// Message coming from the front end/client.
|
||||
front_message = from_front_fut => {
|
||||
let Some(message) = front_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Subxt channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received register message {:?}",
|
||||
message
|
||||
);
|
||||
|
||||
data.handle_requests(message).await;
|
||||
},
|
||||
// Message coming from Smoldot.
|
||||
back_message = from_back_fut => {
|
||||
let Some(back_message) = back_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received smoldot RPC chain {:?} result {:?}",
|
||||
chain_id, back_message
|
||||
);
|
||||
|
||||
data.handle_rpc_response(back_message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Task closed");
|
||||
}
|
||||
}
|
||||
|
||||
struct BackgroundTaskChannels {
|
||||
/// Messages sent into this background task from the front end.
|
||||
from_front: UnboundedReceiverStream<Message>,
|
||||
/// Messages sent into the background task from Smoldot.
|
||||
from_back: smoldot_light::JsonRpcResponses,
|
||||
}
|
||||
|
||||
struct BackgroundTaskData<TPlatform: PlatformRef, TChain> {
|
||||
/// A smoldot light client that can be shared.
|
||||
client: SharedClient<TPlatform, TChain>,
|
||||
/// Knowing the chain ID helps with debugging, but isn't overwise necessary.
|
||||
chain_id: smoldot_light::ChainId,
|
||||
/// Know which Id to use next for new requests/subscriptions.
|
||||
last_request_id: usize,
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
requests: HashMap<usize, oneshot::Sender<MethodResponse>>,
|
||||
@@ -78,20 +213,12 @@ struct ChainData {
|
||||
/// The RPC method request is made in the background and the response should
|
||||
/// not be sent back to the user.
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
id_to_subscription: HashMap<usize, PendingSubscription>,
|
||||
pending_subscriptions: HashMap<usize, PendingSubscription>,
|
||||
/// Map the subscription ID to the frontend `Sender`.
|
||||
///
|
||||
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
|
||||
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
|
||||
subscriptions: HashMap<usize, ActiveSubscription>,
|
||||
}
|
||||
|
||||
impl ChainData {
|
||||
/// Fetch and increment the request ID.
|
||||
fn next_id(&mut self) -> usize {
|
||||
self.last_request_id = self.last_request_id.wrapping_add(1);
|
||||
self.last_request_id
|
||||
}
|
||||
subscriptions: HashMap<String, ActiveSubscription>,
|
||||
}
|
||||
|
||||
/// The state needed to resolve the subscription ID and send
|
||||
@@ -100,72 +227,52 @@ struct PendingSubscription {
|
||||
/// Send the method response ID back to the user.
|
||||
///
|
||||
/// It contains the subscription ID if successful, or an JSON RPC error object.
|
||||
sub_id_sender: oneshot::Sender<MethodResponse>,
|
||||
/// The subscription state that is added to the `subscriptions` map only
|
||||
/// if the subscription ID is successfully sent back to the user.
|
||||
subscription_state: ActiveSubscription,
|
||||
}
|
||||
|
||||
impl PendingSubscription {
|
||||
/// Transforms the pending subscription into an active subscription.
|
||||
fn into_parts(self) -> (oneshot::Sender<MethodResponse>, ActiveSubscription) {
|
||||
(self.sub_id_sender, self.subscription_state)
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the subscription.
|
||||
struct ActiveSubscription {
|
||||
/// Channel to send the subscription notifications back to frontend.
|
||||
sender: mpsc::UnboundedSender<Box<RawValue>>,
|
||||
response_sender: oneshot::Sender<SubscriptionResponse>,
|
||||
/// The unsubscribe method to call when the user drops the receiver
|
||||
/// part of the channel.
|
||||
unsubscribe_method: String,
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
/// Constructs a new [`BackgroundTask`].
|
||||
pub fn new(
|
||||
client: smoldot_light::Client<TPlatform, TChain>,
|
||||
) -> BackgroundTask<TPlatform, TChain> {
|
||||
BackgroundTask {
|
||||
client,
|
||||
chain_data: Default::default(),
|
||||
}
|
||||
}
|
||||
/// The state of the subscription.
|
||||
struct ActiveSubscription {
|
||||
/// Channel to send the subscription notifications back to frontend.
|
||||
notification_sender: mpsc::UnboundedSender<Result<Box<RawValue>, JsonRpcError>>,
|
||||
/// The unsubscribe method to call when the user drops the receiver
|
||||
/// part of the channel.
|
||||
unsubscribe_method: String,
|
||||
}
|
||||
|
||||
fn for_chain_id(
|
||||
&mut self,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
) -> (
|
||||
&mut ChainData,
|
||||
&mut smoldot_light::Client<TPlatform, TChain>,
|
||||
) {
|
||||
let chain_data = self.chain_data.entry(chain_id).or_default();
|
||||
let client = &mut self.client;
|
||||
(chain_data, client)
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTaskData<TPlatform, TChain> {
|
||||
/// Fetch and increment the request ID.
|
||||
fn next_id(&mut self) -> usize {
|
||||
self.last_request_id = self.last_request_id.wrapping_add(1);
|
||||
self.last_request_id
|
||||
}
|
||||
|
||||
/// Handle the registration messages received from the user.
|
||||
async fn handle_requests(&mut self, message: FromSubxt) {
|
||||
async fn handle_requests(&mut self, message: Message) {
|
||||
match message {
|
||||
FromSubxt::Request {
|
||||
Message::Request {
|
||||
method,
|
||||
params,
|
||||
sender,
|
||||
chain_id,
|
||||
} => {
|
||||
let (chain_data, client) = self.for_chain_id(chain_id);
|
||||
let id = chain_data.next_id();
|
||||
let id = self.next_id();
|
||||
let chain_id = self.chain_id;
|
||||
|
||||
let params = match ¶ms {
|
||||
Some(params) => params.get(),
|
||||
None => "null",
|
||||
};
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
|
||||
id, method, params
|
||||
);
|
||||
|
||||
chain_data.requests.insert(id, sender);
|
||||
self.requests.insert(id, sender);
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking request id={id} chain={chain_id:?}");
|
||||
|
||||
let result = client.json_rpc_request(request, chain_id);
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -173,14 +280,14 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
err.to_string()
|
||||
);
|
||||
|
||||
let sender = chain_data
|
||||
let sender = self
|
||||
.requests
|
||||
.remove(&id)
|
||||
.expect("Channel is inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
if sender
|
||||
.send(Err(LightClientRpcError::Request(err.to_string())))
|
||||
.send(Err(LightClientRpcError::SmoldotError(err.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -192,52 +299,49 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
tracing::trace!(target: LOG_TARGET, "Submitted to smoldot request with id={id}");
|
||||
}
|
||||
}
|
||||
FromSubxt::Subscription {
|
||||
Message::Subscription {
|
||||
method,
|
||||
unsubscribe_method,
|
||||
params,
|
||||
sub_id,
|
||||
sender,
|
||||
chain_id,
|
||||
} => {
|
||||
let (chain_data, client) = self.for_chain_id(chain_id);
|
||||
let id = chain_data.next_id();
|
||||
let id = self.next_id();
|
||||
let chain_id = self.chain_id;
|
||||
|
||||
// For subscriptions we need to make a plain RPC request to the subscription method.
|
||||
// The server will return as a result the subscription ID.
|
||||
let params = match ¶ms {
|
||||
Some(params) => params.get(),
|
||||
None => "null",
|
||||
};
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
|
||||
id, method, params
|
||||
);
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}");
|
||||
let subscription_id_state = PendingSubscription {
|
||||
sub_id_sender: sub_id,
|
||||
subscription_state: ActiveSubscription {
|
||||
sender,
|
||||
unsubscribe_method,
|
||||
},
|
||||
let pending_subscription = PendingSubscription {
|
||||
response_sender: sender,
|
||||
unsubscribe_method,
|
||||
};
|
||||
chain_data
|
||||
.id_to_subscription
|
||||
.insert(id, subscription_id_state);
|
||||
self.pending_subscriptions.insert(id, pending_subscription);
|
||||
|
||||
let result = client.json_rpc_request(request, chain_id);
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send RPC request to lightclient {:?}",
|
||||
err.to_string()
|
||||
);
|
||||
let subscription_id_state = chain_data
|
||||
.id_to_subscription
|
||||
let subscription_id_state = self
|
||||
.pending_subscriptions
|
||||
.remove(&id)
|
||||
.expect("Channels are inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
if subscription_id_state
|
||||
.sub_id_sender
|
||||
.send(Err(LightClientRpcError::Request(err.to_string())))
|
||||
.response_sender
|
||||
.send(Err(LightClientRpcError::SmoldotError(err.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -253,20 +357,75 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
|
||||
/// Parse the response received from the light client and sent it to the appropriate user.
|
||||
fn handle_rpc_response(&mut self, chain_id: smoldot_light::ChainId, response: String) {
|
||||
tracing::trace!(target: LOG_TARGET, "Received from smoldot response={response} chain={chain_id:?}");
|
||||
let (chain_data, _client) = self.for_chain_id(chain_id);
|
||||
fn handle_rpc_response(&mut self, response: String) {
|
||||
let chain_id = self.chain_id;
|
||||
tracing::trace!(target: LOG_TARGET, "Received from smoldot response='{response}' chain={chain_id:?}");
|
||||
|
||||
match RpcResponse::from_str(&response) {
|
||||
Ok(RpcResponse::Error { id, error }) => {
|
||||
Ok(RpcResponse::Method { id, result }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
// Send the response back.
|
||||
if let Some(sender) = self.requests.remove(&id) {
|
||||
if sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(pending_subscription) = self.pending_subscriptions.remove(&id) {
|
||||
let Ok(sub_id) = serde_json::from_str::<SubscriptionId>(result.get()) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription id='{result}' chain={chain_id:?} is not a valid string",
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
|
||||
|
||||
let (sub_tx, sub_rx) = mpsc::unbounded_channel();
|
||||
|
||||
// Send the method response and a channel to receive notifications back.
|
||||
if pending_subscription
|
||||
.response_sender
|
||||
.send(Ok((sub_id.clone(), sub_rx)))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send subscription ID response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Store the other end of the notif channel to send future subscription notifications to.
|
||||
self.subscriptions.insert(
|
||||
sub_id,
|
||||
ActiveSubscription {
|
||||
notification_sender: sub_tx,
|
||||
unsubscribe_method: pending_subscription.unsubscribe_method,
|
||||
},
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Response id={id} chain={chain_id:?} is not tracked",
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::MethodError { id, error }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send error. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(sender) = chain_data.requests.remove(&id) {
|
||||
if let Some(sender) = self.requests.remove(&id) {
|
||||
if sender
|
||||
.send(Err(LightClientRpcError::Request(error.to_string())))
|
||||
.send(Err(LightClientRpcError::JsonRpcError(JsonRpcError(error))))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -274,12 +433,10 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(subscription_id_state) =
|
||||
chain_data.id_to_subscription.remove(&id)
|
||||
{
|
||||
} else if let Some(subscription_id_state) = self.pending_subscriptions.remove(&id) {
|
||||
if subscription_id_state
|
||||
.sub_id_sender
|
||||
.send(Err(LightClientRpcError::Request(error.to_string())))
|
||||
.response_sender
|
||||
.send(Err(LightClientRpcError::JsonRpcError(JsonRpcError(error))))
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
@@ -289,93 +446,44 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::Method { id, result }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
// Send the response back.
|
||||
if let Some(sender) = chain_data.requests.remove(&id) {
|
||||
if sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(pending_subscription) = chain_data.id_to_subscription.remove(&id)
|
||||
{
|
||||
let Ok(sub_id) = result
|
||||
.get()
|
||||
.trim_start_matches('"')
|
||||
.trim_end_matches('"')
|
||||
.parse::<usize>()
|
||||
else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription id={result} chain={chain_id:?} is not a valid number",
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
|
||||
|
||||
let (sub_id_sender, active_subscription) = pending_subscription.into_parts();
|
||||
if sub_id_sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Track this subscription ID if send is successful.
|
||||
chain_data.subscriptions.insert(sub_id, active_subscription);
|
||||
} else {
|
||||
Ok(RpcResponse::Notification {
|
||||
method,
|
||||
subscription_id,
|
||||
result,
|
||||
}) => {
|
||||
let Some(active_subscription) = self.subscriptions.get_mut(&subscription_id) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Response id={id} chain={chain_id:?} is not tracked",
|
||||
"Subscription response id={subscription_id} chain={chain_id:?} method={method} is not tracked",
|
||||
);
|
||||
return;
|
||||
};
|
||||
if active_subscription
|
||||
.notification_sender
|
||||
.send(Ok(result))
|
||||
.is_err()
|
||||
{
|
||||
self.unsubscribe(&subscription_id, chain_id);
|
||||
}
|
||||
}
|
||||
Ok(RpcResponse::Subscription { method, id, result }) => {
|
||||
let Ok(id) = id.parse::<usize>() else {
|
||||
tracing::warn!(target: LOG_TARGET, "Cannot send subscription. Id={id} chain={chain_id:?} is not a valid number");
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(subscription_state) = chain_data.subscriptions.get_mut(&id) else {
|
||||
Ok(RpcResponse::NotificationError {
|
||||
method,
|
||||
subscription_id,
|
||||
error,
|
||||
}) => {
|
||||
let Some(active_subscription) = self.subscriptions.get_mut(&subscription_id) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription response id={id} chain={chain_id:?} method={method} is not tracked",
|
||||
"Subscription error id={subscription_id} chain={chain_id:?} method={method} is not tracked",
|
||||
);
|
||||
return;
|
||||
};
|
||||
if subscription_state.sender.send(result).is_ok() {
|
||||
// Nothing else to do, user is informed about the notification.
|
||||
return;
|
||||
}
|
||||
|
||||
// User dropped the receiver, unsubscribe from the method and remove internal tracking.
|
||||
let Some(subscription_state) = chain_data.subscriptions.remove(&id) else {
|
||||
// State is checked to be some above, so this should never happen.
|
||||
return;
|
||||
};
|
||||
// Make a call to unsubscribe from this method.
|
||||
let unsub_id = chain_data.next_id();
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
|
||||
unsub_id, subscription_state.unsubscribe_method, id
|
||||
);
|
||||
|
||||
if let Err(err) = self.client.json_rpc_request(request, chain_id) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method);
|
||||
if active_subscription
|
||||
.notification_sender
|
||||
.send(Err(JsonRpcError(error)))
|
||||
.is_err()
|
||||
{
|
||||
self.unsubscribe(&subscription_id, chain_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -384,169 +492,28 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform the main background task:
|
||||
/// - receiving requests from subxt RPC method / subscriptions
|
||||
/// - provides the results from the light client back to users.
|
||||
pub async fn start_task(
|
||||
&mut self,
|
||||
from_subxt: mpsc::UnboundedReceiver<FromSubxt>,
|
||||
from_node: Vec<AddedChain>,
|
||||
) {
|
||||
let from_subxt_event = tokio_stream::wrappers::UnboundedReceiverStream::new(from_subxt);
|
||||
// Unsubscribe from a subscription.
|
||||
fn unsubscribe(&mut self, subscription_id: &str, chain_id: smoldot_light::ChainId) {
|
||||
let Some(active_subscription) = self.subscriptions.remove(subscription_id) else {
|
||||
// Subscription doesn't exist so nothing more to do.
|
||||
return;
|
||||
};
|
||||
|
||||
let from_node = from_node.into_iter().map(|rpc| {
|
||||
Box::pin(futures::stream::unfold(rpc, |mut rpc| async move {
|
||||
let response = rpc.rpc_responses.next().await;
|
||||
Some(((response, rpc.chain_id), rpc))
|
||||
}))
|
||||
});
|
||||
let stream_combinator = futures::stream::select_all(from_node);
|
||||
// Build a call to unsubscribe from this method.
|
||||
let unsub_id = self.next_id();
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
|
||||
unsub_id, active_subscription.unsubscribe_method, subscription_id
|
||||
);
|
||||
|
||||
tokio::pin!(from_subxt_event, stream_combinator);
|
||||
|
||||
let mut from_subxt_event_fut = from_subxt_event.next();
|
||||
let mut from_node_event_fut = stream_combinator.next();
|
||||
|
||||
loop {
|
||||
match future::select(from_subxt_event_fut, from_node_event_fut).await {
|
||||
// Message received from subxt.
|
||||
Either::Left((subxt_message, previous_fut)) => {
|
||||
let Some(message) = subxt_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Subxt channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received register message {:?}",
|
||||
message
|
||||
);
|
||||
|
||||
self.handle_requests(message).await;
|
||||
|
||||
from_subxt_event_fut = from_subxt_event.next();
|
||||
from_node_event_fut = previous_fut;
|
||||
}
|
||||
// Message received from rpc handler: lightclient response.
|
||||
Either::Right((node_message, previous_fut)) => {
|
||||
let Some((node_message, chain)) = node_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Smoldot closed all RPC channels");
|
||||
break;
|
||||
};
|
||||
// Smoldot returns `None` if the chain has been removed (which subxt does not remove).
|
||||
let Some(response) = node_message else {
|
||||
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
|
||||
break;
|
||||
};
|
||||
tracing::trace!(
|
||||
target: LOG_TARGET,
|
||||
"Received smoldot RPC chain {:?} result {:?}",
|
||||
chain, response
|
||||
);
|
||||
|
||||
self.handle_rpc_response(chain, response);
|
||||
|
||||
// Advance backend, save frontend.
|
||||
from_subxt_event_fut = previous_fut;
|
||||
from_node_event_fut = stream_combinator.next();
|
||||
}
|
||||
}
|
||||
// Submit it.
|
||||
if let Err(err) = self.client.json_rpc_request(request, chain_id) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to unsubscribe id={subscription_id} chain={chain_id:?} method={:?} err={err:?}", active_subscription.unsubscribe_method
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={subscription_id} chain={chain_id:?} method={:?}", active_subscription.unsubscribe_method);
|
||||
}
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Task closed");
|
||||
}
|
||||
}
|
||||
|
||||
/// The RPC response from the light-client.
|
||||
/// This can either be a response of a method, or a notification from a subscription.
|
||||
#[derive(Debug, Clone)]
|
||||
enum RpcResponse {
|
||||
Method {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// The result of the method call.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
Subscription {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
id: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
Error {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// Error.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
}
|
||||
|
||||
impl std::str::FromStr for RpcResponse {
|
||||
type Err = serde_json::Error;
|
||||
|
||||
fn from_str(response: &str) -> Result<Self, Self::Err> {
|
||||
// Helper structures to deserialize from raw RPC strings.
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Response {
|
||||
/// JSON-RPC version.
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
/// Request ID
|
||||
id: String,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationParams {
|
||||
/// The ID of the subscription.
|
||||
subscription: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct ResponseNotification {
|
||||
/// JSON-RPC version.
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Result.
|
||||
params: NotificationParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct ErrorResponse {
|
||||
/// JSON-RPC version.
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
/// Request ID.
|
||||
id: String,
|
||||
/// Error.
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Check if the response can be mapped as an RPC method response.
|
||||
let result: Result<Response, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Method {
|
||||
id: response.id,
|
||||
result: response.result,
|
||||
});
|
||||
}
|
||||
|
||||
let result: Result<ResponseNotification, _> = serde_json::from_str(response);
|
||||
if let Ok(notification) = result {
|
||||
return Ok(RpcResponse::Subscription {
|
||||
id: notification.params.subscription,
|
||||
method: notification.method,
|
||||
result: notification.params.result,
|
||||
});
|
||||
}
|
||||
|
||||
let error: ErrorResponse = serde_json::from_str(response)?;
|
||||
Ok(RpcResponse::Error {
|
||||
id: error.id,
|
||||
error: error.error,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use serde_json::Value;
|
||||
use std::borrow::Cow;
|
||||
|
||||
/// Something went wrong building chain config.
|
||||
#[non_exhaustive]
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum ChainConfigError {
|
||||
/// The provided chain spec is the wrong shape.
|
||||
#[error("Invalid chain spec format")]
|
||||
InvalidSpecFormat,
|
||||
}
|
||||
|
||||
/// Configuration to connect to a chain.
|
||||
pub struct ChainConfig<'a> {
|
||||
// The chain spec to use.
|
||||
chain_spec: Cow<'a, str>,
|
||||
}
|
||||
|
||||
impl<'a> From<&'a str> for ChainConfig<'a> {
|
||||
fn from(chain_spec: &'a str) -> Self {
|
||||
ChainConfig::chain_spec(chain_spec)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<String> for ChainConfig<'a> {
|
||||
fn from(chain_spec: String) -> Self {
|
||||
ChainConfig::chain_spec(chain_spec)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ChainConfig<'a> {
|
||||
/// Construct a chain config from a chain spec.
|
||||
pub fn chain_spec(chain_spec: impl Into<Cow<'a, str>>) -> Self {
|
||||
ChainConfig {
|
||||
chain_spec: chain_spec.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the bootnodes to the given ones.
|
||||
pub fn set_bootnodes<S: AsRef<str>>(
|
||||
self,
|
||||
bootnodes: impl IntoIterator<Item = S>,
|
||||
) -> Result<Self, ChainConfigError> {
|
||||
let mut chain_spec_json: Value = serde_json::from_str(&self.chain_spec)
|
||||
.map_err(|_e| ChainConfigError::InvalidSpecFormat)?;
|
||||
|
||||
if let Value::Object(map) = &mut chain_spec_json {
|
||||
let bootnodes = bootnodes
|
||||
.into_iter()
|
||||
.map(|s| Value::String(s.as_ref().to_owned()))
|
||||
.collect();
|
||||
|
||||
map.insert("bootNodes".to_string(), Value::Array(bootnodes));
|
||||
} else {
|
||||
return Err(ChainConfigError::InvalidSpecFormat);
|
||||
}
|
||||
|
||||
Ok(ChainConfig {
|
||||
chain_spec: Cow::Owned(chain_spec_json.to_string()),
|
||||
})
|
||||
}
|
||||
|
||||
// Used internally to fetch the chain spec back out.
|
||||
pub(crate) fn as_chain_spec(&self) -> &str {
|
||||
&self.chain_spec
|
||||
}
|
||||
}
|
||||
@@ -1,212 +0,0 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
use std::iter;
|
||||
|
||||
use super::{
|
||||
background::{BackgroundTask, FromSubxt, MethodResponse},
|
||||
LightClientRpcError,
|
||||
};
|
||||
use serde_json::value::RawValue;
|
||||
use tokio::sync::{mpsc, mpsc::error::SendError, oneshot};
|
||||
|
||||
use super::platform::build_platform;
|
||||
|
||||
pub const LOG_TARGET: &str = "subxt-light-client";
|
||||
|
||||
/// A raw light-client RPC implementation that can connect to multiple chains.
|
||||
#[derive(Clone)]
|
||||
pub struct RawLightClientRpc {
|
||||
/// Communicate with the backend task that multiplexes the responses
|
||||
/// back to the frontend.
|
||||
to_backend: mpsc::UnboundedSender<FromSubxt>,
|
||||
}
|
||||
|
||||
impl RawLightClientRpc {
|
||||
/// Construct a [`LightClientRpc`] that can communicated with the provided chain.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// This uses the same underlying instance created by [`LightClientRpc::new_from_client`].
|
||||
pub fn for_chain(&self, chain_id: smoldot_light::ChainId) -> LightClientRpc {
|
||||
LightClientRpc {
|
||||
to_backend: self.to_backend.clone(),
|
||||
chain_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The light-client RPC implementation that is used to connect with the chain.
|
||||
#[derive(Clone)]
|
||||
pub struct LightClientRpc {
|
||||
/// Communicate with the backend task that multiplexes the responses
|
||||
/// back to the frontend.
|
||||
to_backend: mpsc::UnboundedSender<FromSubxt>,
|
||||
/// The chain ID to target for requests.
|
||||
chain_id: smoldot_light::ChainId,
|
||||
}
|
||||
|
||||
impl LightClientRpc {
|
||||
/// Constructs a new [`LightClientRpc`], providing the chain specification.
|
||||
///
|
||||
/// The chain specification can be downloaded from a trusted network via
|
||||
/// the `sync_state_genSyncSpec` RPC method. This parameter expects the
|
||||
/// chain spec in text format (ie not in hex-encoded scale-encoded as RPC methods
|
||||
/// will provide).
|
||||
///
|
||||
/// ## Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ### Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ### Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html.
|
||||
pub fn new(
|
||||
config: smoldot_light::AddChainConfig<
|
||||
'_,
|
||||
(),
|
||||
impl IntoIterator<Item = smoldot_light::ChainId>,
|
||||
>,
|
||||
) -> Result<LightClientRpc, LightClientRpcError> {
|
||||
tracing::trace!(target: LOG_TARGET, "Create light client");
|
||||
|
||||
let mut client = smoldot_light::Client::new(build_platform());
|
||||
|
||||
let config = smoldot_light::AddChainConfig {
|
||||
specification: config.specification,
|
||||
json_rpc: config.json_rpc,
|
||||
database_content: config.database_content,
|
||||
potential_relay_chains: config.potential_relay_chains.into_iter(),
|
||||
user_data: config.user_data,
|
||||
};
|
||||
|
||||
let smoldot_light::AddChainSuccess {
|
||||
chain_id,
|
||||
json_rpc_responses,
|
||||
} = client
|
||||
.add_chain(config)
|
||||
.map_err(|err| LightClientRpcError::AddChainError(err.to_string()))?;
|
||||
|
||||
let rpc_responses = json_rpc_responses.expect("Light client RPC configured; qed");
|
||||
|
||||
let raw_client = Self::new_from_client(
|
||||
client,
|
||||
iter::once(AddedChain {
|
||||
chain_id,
|
||||
rpc_responses,
|
||||
}),
|
||||
);
|
||||
Ok(raw_client.for_chain(chain_id))
|
||||
}
|
||||
|
||||
/// Constructs a new [`RawLightClientRpc`] from the raw smoldot client.
|
||||
///
|
||||
/// Receives a list of RPC objects as a result of calling `smoldot_light::Client::add_chain`.
|
||||
/// This [`RawLightClientRpc`] can target different chains using [`RawLightClientRpc::for_chain`] method.
|
||||
///
|
||||
/// ## Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ### Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ### Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html.
|
||||
pub fn new_from_client<TPlat>(
|
||||
client: smoldot_light::Client<TPlat>,
|
||||
chains: impl IntoIterator<Item = AddedChain>,
|
||||
) -> RawLightClientRpc
|
||||
where
|
||||
TPlat: smoldot_light::platform::PlatformRef + Clone,
|
||||
{
|
||||
let (to_backend, backend) = mpsc::unbounded_channel();
|
||||
let chains = chains.into_iter().collect();
|
||||
|
||||
let future = async move {
|
||||
let mut task = BackgroundTask::new(client);
|
||||
task.start_task(backend, chains).await;
|
||||
};
|
||||
|
||||
#[cfg(feature = "native")]
|
||||
tokio::spawn(future);
|
||||
#[cfg(feature = "web")]
|
||||
wasm_bindgen_futures::spawn_local(future);
|
||||
|
||||
RawLightClientRpc { to_backend }
|
||||
}
|
||||
|
||||
/// Returns the chain ID of the current light-client.
|
||||
pub fn chain_id(&self) -> smoldot_light::ChainId {
|
||||
self.chain_id
|
||||
}
|
||||
|
||||
/// Submits an RPC method request to the light-client.
|
||||
///
|
||||
/// This method sends a request to the light-client to execute an RPC method with the provided parameters.
|
||||
/// The parameters are parsed into a valid JSON object in the background.
|
||||
pub fn method_request(
|
||||
&self,
|
||||
method: String,
|
||||
params: String,
|
||||
) -> Result<oneshot::Receiver<MethodResponse>, SendError<FromSubxt>> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
self.to_backend.send(FromSubxt::Request {
|
||||
method,
|
||||
params,
|
||||
sender,
|
||||
chain_id: self.chain_id,
|
||||
})?;
|
||||
|
||||
Ok(receiver)
|
||||
}
|
||||
|
||||
/// Makes an RPC subscription call to the light-client.
|
||||
///
|
||||
/// This method sends a request to the light-client to establish an RPC subscription with the provided parameters.
|
||||
/// The parameters are parsed into a valid JSON object in the background.
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn subscription_request(
|
||||
&self,
|
||||
method: String,
|
||||
params: String,
|
||||
unsubscribe_method: String,
|
||||
) -> Result<
|
||||
(
|
||||
oneshot::Receiver<MethodResponse>,
|
||||
mpsc::UnboundedReceiver<Box<RawValue>>,
|
||||
),
|
||||
SendError<FromSubxt>,
|
||||
> {
|
||||
let (sub_id, sub_id_rx) = oneshot::channel();
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
|
||||
self.to_backend.send(FromSubxt::Subscription {
|
||||
method,
|
||||
unsubscribe_method,
|
||||
params,
|
||||
sub_id,
|
||||
sender,
|
||||
chain_id: self.chain_id,
|
||||
})?;
|
||||
|
||||
Ok((sub_id_rx, receiver))
|
||||
}
|
||||
}
|
||||
|
||||
/// The added chain of the light-client.
|
||||
pub struct AddedChain {
|
||||
/// The id of the chain.
|
||||
pub chain_id: smoldot_light::ChainId,
|
||||
/// Producer of RPC responses for the chain.
|
||||
pub rpc_responses: smoldot_light::JsonRpcResponses,
|
||||
}
|
||||
+239
-32
@@ -2,52 +2,259 @@
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
//! Low level light client implementation for RPC method and
|
||||
//! subscriptions requests.
|
||||
//!
|
||||
//! The client implementation supports both native and wasm
|
||||
//! environments.
|
||||
//!
|
||||
//! This leverages the smoldot crate to connect to the chain.
|
||||
//! A wrapper around [`smoldot_light`] which provides an light client capable of connecting
|
||||
//! to Substrate based chains.
|
||||
|
||||
#![deny(missing_docs)]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))]
|
||||
|
||||
#[cfg(any(
|
||||
all(feature = "web", feature = "native"),
|
||||
not(any(feature = "web", feature = "native"))
|
||||
))]
|
||||
compile_error!("subxt: exactly one of the 'web' and 'native' features should be used.");
|
||||
compile_error!("subxt-lightclient: exactly one of the 'web' and 'native' features should be used.");
|
||||
|
||||
mod background;
|
||||
mod client;
|
||||
mod platform;
|
||||
mod shared_client;
|
||||
// mod receiver;
|
||||
mod background;
|
||||
mod chain_config;
|
||||
mod rpc;
|
||||
|
||||
// Used to enable the js feature for wasm.
|
||||
#[cfg(feature = "web")]
|
||||
#[allow(unused_imports)]
|
||||
pub use getrandom as _;
|
||||
use background::{BackgroundTask, BackgroundTaskHandle};
|
||||
use futures::Stream;
|
||||
use platform::DefaultPlatform;
|
||||
use serde_json::value::RawValue;
|
||||
use shared_client::SharedClient;
|
||||
use std::future::Future;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub use client::{AddedChain, LightClientRpc, RawLightClientRpc};
|
||||
pub use chain_config::{ChainConfig, ChainConfigError};
|
||||
|
||||
/// Re-exports of the smoldot related objects.
|
||||
pub mod smoldot {
|
||||
pub use smoldot_light::{
|
||||
platform::PlatformRef, AddChainConfig, AddChainConfigJsonRpc, ChainId, Client,
|
||||
JsonRpcResponses,
|
||||
};
|
||||
|
||||
#[cfg(feature = "native")]
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "native")))]
|
||||
pub use smoldot_light::platform::default::DefaultPlatform;
|
||||
}
|
||||
|
||||
/// Light client error.
|
||||
/// Things that can go wrong when constructing the [`LightClient`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum LightClientRpcError {
|
||||
pub enum LightClientError {
|
||||
/// Error encountered while adding the chain to the light-client.
|
||||
#[error("Failed to add the chain to the light client: {0}.")]
|
||||
AddChainError(String),
|
||||
/// Error originated while trying to submit a RPC request.
|
||||
#[error("RPC request cannot be sent: {0}.")]
|
||||
Request(String),
|
||||
}
|
||||
|
||||
/// Things that can go wrong calling methods of [`LightClientRpc`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum LightClientRpcError {
|
||||
/// Error response from the JSON-RPC server.
|
||||
#[error("{0}")]
|
||||
JsonRpcError(JsonRpcError),
|
||||
/// Smoldot could not handle the RPC call.
|
||||
#[error("Smoldot could not handle the RPC call: {0}.")]
|
||||
SmoldotError(String),
|
||||
/// Background task dropped.
|
||||
#[error("The background task was dropped.")]
|
||||
BackgroundTaskDropped,
|
||||
}
|
||||
|
||||
/// An error response from the JSON-RPC server (ie smoldot) in response to
|
||||
/// a method call or as a subscription notification.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("RPC Error: {0}.")]
|
||||
pub struct JsonRpcError(Box<RawValue>);
|
||||
|
||||
/// This represents a single light client connection to the network. Instantiate
|
||||
/// it with [`LightClient::relay_chain()`] to communicate with a relay chain, and
|
||||
/// then call [`LightClient::parachain()`] to establish connections to parachains.
|
||||
#[derive(Clone)]
|
||||
pub struct LightClient {
|
||||
client: SharedClient<DefaultPlatform>,
|
||||
relay_chain_id: smoldot_light::ChainId,
|
||||
}
|
||||
|
||||
impl LightClient {
|
||||
/// Given a chain spec, establish a connection to a relay chain. Any subsequent calls to
|
||||
/// [`LightClient::parachain()`] will set this as the relay chain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ## Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ## Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html.
|
||||
pub fn relay_chain<'a>(
|
||||
chain_config: impl Into<ChainConfig<'a>>,
|
||||
) -> Result<(Self, LightClientRpc), LightClientError> {
|
||||
let mut client = smoldot_light::Client::new(platform::build_platform());
|
||||
let chain_config = chain_config.into();
|
||||
let chain_spec = chain_config.as_chain_spec();
|
||||
|
||||
let config = smoldot_light::AddChainConfig {
|
||||
specification: chain_spec,
|
||||
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
|
||||
max_pending_requests: u32::MAX.try_into().unwrap(),
|
||||
max_subscriptions: u32::MAX,
|
||||
},
|
||||
database_content: "",
|
||||
potential_relay_chains: std::iter::empty(),
|
||||
user_data: (),
|
||||
};
|
||||
|
||||
let added_chain = client
|
||||
.add_chain(config)
|
||||
.map_err(|err| LightClientError::AddChainError(err.to_string()))?;
|
||||
|
||||
let relay_chain_id = added_chain.chain_id;
|
||||
let rpc_responses = added_chain
|
||||
.json_rpc_responses
|
||||
.expect("Light client RPC configured; qed");
|
||||
let shared_client: SharedClient<_> = client.into();
|
||||
|
||||
let light_client_rpc =
|
||||
LightClientRpc::new_raw(shared_client.clone(), relay_chain_id, rpc_responses);
|
||||
let light_client = Self {
|
||||
client: shared_client,
|
||||
relay_chain_id,
|
||||
};
|
||||
|
||||
Ok((light_client, light_client_rpc))
|
||||
}
|
||||
|
||||
/// Given a chain spec, establish a connection to a parachain.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// The panic behaviour depends on the feature flag being used:
|
||||
///
|
||||
/// ## Native
|
||||
///
|
||||
/// Panics when called outside of a `tokio` runtime context.
|
||||
///
|
||||
/// ## Web
|
||||
///
|
||||
/// If smoldot panics, then the promise created will be leaked. For more details, see
|
||||
/// https://docs.rs/wasm-bindgen-futures/latest/wasm_bindgen_futures/fn.future_to_promise.html.
|
||||
pub fn parachain<'a>(
|
||||
&self,
|
||||
chain_config: impl Into<ChainConfig<'a>>,
|
||||
) -> Result<LightClientRpc, LightClientError> {
|
||||
let chain_config = chain_config.into();
|
||||
let chain_spec = chain_config.as_chain_spec();
|
||||
|
||||
let config = smoldot_light::AddChainConfig {
|
||||
specification: chain_spec,
|
||||
json_rpc: smoldot_light::AddChainConfigJsonRpc::Enabled {
|
||||
max_pending_requests: u32::MAX.try_into().unwrap(),
|
||||
max_subscriptions: u32::MAX,
|
||||
},
|
||||
database_content: "",
|
||||
potential_relay_chains: std::iter::once(self.relay_chain_id),
|
||||
user_data: (),
|
||||
};
|
||||
|
||||
let added_chain = self
|
||||
.client
|
||||
.add_chain(config)
|
||||
.map_err(|err| LightClientError::AddChainError(err.to_string()))?;
|
||||
|
||||
let chain_id = added_chain.chain_id;
|
||||
let rpc_responses = added_chain
|
||||
.json_rpc_responses
|
||||
.expect("Light client RPC configured; qed");
|
||||
|
||||
Ok(LightClientRpc::new_raw(
|
||||
self.client.clone(),
|
||||
chain_id,
|
||||
rpc_responses,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// This represents a single RPC connection to a specific chain, and is constructed by calling
|
||||
/// one of the methods on [`LightClient`]. Using this, you can make RPC requests to the chain.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LightClientRpc {
|
||||
handle: BackgroundTaskHandle,
|
||||
}
|
||||
|
||||
impl LightClientRpc {
|
||||
// Dev note: this would provide a "low leveL" interface if one is needed.
|
||||
// Do we actually need to provide this, or can we entirely hide Smoldot?
|
||||
pub(crate) fn new_raw<TPlat, TChain>(
|
||||
client: impl Into<SharedClient<TPlat, TChain>>,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
rpc_responses: smoldot_light::JsonRpcResponses,
|
||||
) -> Self
|
||||
where
|
||||
TPlat: smoldot_light::platform::PlatformRef + Send + 'static,
|
||||
TChain: Send + 'static,
|
||||
{
|
||||
let (background_task, background_handle) =
|
||||
BackgroundTask::new(client.into(), chain_id, rpc_responses);
|
||||
|
||||
// For now we spawn the background task internally, but later we can expose
|
||||
// methods to give this back to the user so that they can exert backpressure.
|
||||
spawn(async move { background_task.run().await });
|
||||
|
||||
LightClientRpc {
|
||||
handle: background_handle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Make an RPC request to a chain, getting back a result.
|
||||
pub async fn request(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
) -> Result<Box<RawValue>, LightClientRpcError> {
|
||||
self.handle.request(method, params).await
|
||||
}
|
||||
|
||||
/// Subscribe to some RPC method, getting back a stream of notifications.
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
method: String,
|
||||
params: Option<Box<RawValue>>,
|
||||
unsub: String,
|
||||
) -> Result<LightClientRpcSubscription, LightClientRpcError> {
|
||||
let (id, notifications) = self.handle.subscribe(method, params, unsub).await?;
|
||||
Ok(LightClientRpcSubscription { id, notifications })
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream of notifications handed back when [`LightClientRpc::subscribe`] is called.
|
||||
pub struct LightClientRpcSubscription {
|
||||
notifications: mpsc::UnboundedReceiver<Result<Box<RawValue>, JsonRpcError>>,
|
||||
id: String,
|
||||
}
|
||||
|
||||
impl LightClientRpcSubscription {
|
||||
/// Return the subscription ID
|
||||
pub fn id(&self) -> &str {
|
||||
&self.id
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for LightClientRpcSubscription {
|
||||
type Item = Result<Box<RawValue>, JsonRpcError>;
|
||||
fn poll_next(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.notifications.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// A quick helper to spawn a task that works for WASM.
|
||||
fn spawn<F: Future + Send + 'static>(future: F) {
|
||||
#[cfg(feature = "native")]
|
||||
tokio::spawn(async move {
|
||||
future.await;
|
||||
});
|
||||
#[cfg(feature = "web")]
|
||||
wasm_bindgen_futures::spawn_local(async move {
|
||||
future.await;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -11,16 +11,16 @@ mod wasm_platform;
|
||||
#[cfg(feature = "web")]
|
||||
mod wasm_socket;
|
||||
|
||||
pub use helpers::build_platform;
|
||||
pub use helpers::{build_platform, DefaultPlatform};
|
||||
|
||||
#[cfg(feature = "native")]
|
||||
mod helpers {
|
||||
use smoldot_light::platform::default::DefaultPlatform as Platform;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub type PlatformType = Arc<Platform>;
|
||||
pub type DefaultPlatform = Arc<Platform>;
|
||||
|
||||
pub fn build_platform() -> PlatformType {
|
||||
pub fn build_platform() -> DefaultPlatform {
|
||||
Platform::new(
|
||||
"subxt-light-client".into(),
|
||||
env!("CARGO_PKG_VERSION").into(),
|
||||
@@ -32,9 +32,9 @@ mod helpers {
|
||||
mod helpers {
|
||||
use super::wasm_platform::SubxtPlatform as Platform;
|
||||
|
||||
pub type PlatformType = Platform;
|
||||
pub type DefaultPlatform = Platform;
|
||||
|
||||
pub fn build_platform() -> PlatformType {
|
||||
pub fn build_platform() -> DefaultPlatform {
|
||||
Platform::new()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
/// The RPC response from the light-client.
|
||||
/// This can either be a response of a method, or a notification from a subscription.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum RpcResponse {
|
||||
Method {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// The result of the method call.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
MethodError {
|
||||
/// Response ID.
|
||||
id: String,
|
||||
/// Error.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
Notification {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
subscription_id: String,
|
||||
/// Result.
|
||||
result: Box<RawValue>,
|
||||
},
|
||||
NotificationError {
|
||||
/// RPC method that generated the notification.
|
||||
method: String,
|
||||
/// Subscription ID.
|
||||
subscription_id: String,
|
||||
/// Result.
|
||||
error: Box<RawValue>,
|
||||
},
|
||||
}
|
||||
|
||||
impl std::str::FromStr for RpcResponse {
|
||||
type Err = ();
|
||||
|
||||
fn from_str(response: &str) -> Result<Self, Self::Err> {
|
||||
// Valid response
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct Response {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
id: String,
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Error response
|
||||
#[derive(Deserialize)]
|
||||
struct ResponseError {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
id: String,
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Valid notification (subscription) response
|
||||
#[derive(Deserialize)]
|
||||
struct Notification {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: NotificationResultParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationResultParams {
|
||||
subscription: String,
|
||||
result: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Error notification (subscription) response
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationError {
|
||||
#[allow(unused)]
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
params: NotificationErrorParams,
|
||||
}
|
||||
#[derive(Deserialize)]
|
||||
struct NotificationErrorParams {
|
||||
/// The ID of the subscription.
|
||||
subscription: String,
|
||||
error: Box<RawValue>,
|
||||
}
|
||||
|
||||
// Try deserializing the response payload to one of the above. We can
|
||||
// do this more efficiently eg how jsonrpsee_types does.
|
||||
|
||||
let result: Result<Response, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Method {
|
||||
id: response.id,
|
||||
result: response.result,
|
||||
});
|
||||
}
|
||||
let result: Result<Notification, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::Notification {
|
||||
subscription_id: response.params.subscription,
|
||||
method: response.method,
|
||||
result: response.params.result,
|
||||
});
|
||||
}
|
||||
let result: Result<ResponseError, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::MethodError {
|
||||
id: response.id,
|
||||
error: response.error,
|
||||
});
|
||||
}
|
||||
let result: Result<NotificationError, _> = serde_json::from_str(response);
|
||||
if let Ok(response) = result {
|
||||
return Ok(RpcResponse::NotificationError {
|
||||
method: response.method,
|
||||
subscription_id: response.params.subscription,
|
||||
error: response.params.error,
|
||||
});
|
||||
}
|
||||
|
||||
// We couldn't decode into any of the above. We could pick one of the above`
|
||||
// errors to return, but there's no real point since the string is obviously
|
||||
// different from any of them.
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
|
||||
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
|
||||
// see LICENSE for license details.
|
||||
|
||||
use smoldot_light as sl;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
/// This wraps [`smoldot_light::Client`] so that it can be cloned and shared.
|
||||
#[derive(Clone)]
|
||||
pub struct SharedClient<TPlat: sl::platform::PlatformRef, TChain = ()> {
|
||||
client: Arc<Mutex<sl::Client<TPlat, TChain>>>,
|
||||
}
|
||||
|
||||
impl<TPlat: sl::platform::PlatformRef, TChain> From<sl::Client<TPlat, TChain>>
|
||||
for SharedClient<TPlat, TChain>
|
||||
{
|
||||
fn from(client: sl::Client<TPlat, TChain>) -> Self {
|
||||
SharedClient {
|
||||
client: Arc::new(Mutex::new(client)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<TPlat: sl::platform::PlatformRef, TChain> SharedClient<TPlat, TChain> {
|
||||
/// Delegates to [`smoldot_light::Client::json_rpc_request()`].
|
||||
pub(crate) fn json_rpc_request(
|
||||
&self,
|
||||
json_rpc_request: impl Into<String>,
|
||||
chain_id: sl::ChainId,
|
||||
) -> Result<(), sl::HandleRpcError> {
|
||||
self.client
|
||||
.lock()
|
||||
.expect("mutex should not be poisoned")
|
||||
.json_rpc_request(json_rpc_request, chain_id)
|
||||
}
|
||||
|
||||
/// Delegates to [`smoldot_light::Client::add_chain()`].
|
||||
pub(crate) fn add_chain(
|
||||
&self,
|
||||
config: sl::AddChainConfig<'_, TChain, impl Iterator<Item = sl::ChainId>>,
|
||||
) -> Result<sl::AddChainSuccess, sl::AddChainError> {
|
||||
self.client
|
||||
.lock()
|
||||
.expect("mutex should not be poisoned")
|
||||
.add_chain(config)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user