lightclient: Add support for multi-chain usecase (#1238)

* lightclient: Make `smoldot::chainID` part of the RPC requests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Make `BackgroundTask` generic over `PlatformRef` and chain

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Construct from raw smoldot and target different chains

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* testing: Update cargo lock for wasm tests

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Reuse `new_from_client` method and removed unused imports

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Reexport smoldot client and RPC objects used in pub
interface

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Adjust `new_from_client` interface

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Extend background to poll over multiple RPC objects

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Build light client from raw and target different chains

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* artifacts: Add demo chain specs

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* artifacts: Move artifacts to dedicated folder

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Use SelectAll to drive all streams

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Fetch initial data from the target chain

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Reexport other smoldot objects

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Target chain with potentially different config

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt/rpc: Log chainID for debugging

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt/examples: Add smoldot client with parachain example

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Propagate chain ID together with rpc responses object

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Multiplex responses by request ID and chain ID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Add raw light client builder

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Add cargo feature flag for parachains example

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Derive default for internal structure

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Guard reexports by std feature flag

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update subxt/src/client/light_client/mod.rs

Co-authored-by: James Wilson <james@jsdw.me>

* lightclient: Update the builder pattern and chain targetting

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Fix documentation

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Provide more insightful docs wrt native/wasm panics

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* examples: Adjust comment location

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Refactor UniqueChainId into the background task

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* Update lightclient/src/background.rs

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

* Update subxt/src/client/light_client/builder.rs

Co-authored-by: James Wilson <james@jsdw.me>

* lightclient: Update docs wrt panics

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Update docs wrt to smoldot instance -> client

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Use IntoIter instead of Iterator

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Adjsut docs wrt [`Self::new_from_client`]

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* subxt: Remove RawRpc from LightClient in favor of chainID

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Reexport everything under smoldot module

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* artifacts: Use stateRootHash instead of genesis.raw

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
Alexandru Vasile
2023-11-16 18:29:00 +02:00
committed by GitHub
parent 7b210f5a8e
commit 34ff3da37d
11 changed files with 606 additions and 104 deletions
+95 -65
View File
@@ -6,14 +6,16 @@ use futures::stream::StreamExt;
use futures_util::future::{self, Either};
use serde::Deserialize;
use serde_json::value::RawValue;
use smoldot_light::platform::PlatformRef;
use std::{collections::HashMap, str::FromStr};
use tokio::sync::{mpsc, oneshot};
use super::platform::PlatformType;
use crate::client::AddedChain;
use super::LightClientRpcError;
use smoldot_light::ChainId;
const LOG_TARGET: &str = "light-client-background";
const LOG_TARGET: &str = "subxt-light-client-background";
/// The response of an RPC method.
pub type MethodResponse = Result<Box<RawValue>, LightClientRpcError>;
@@ -34,6 +36,8 @@ pub enum FromSubxt {
params: String,
/// Channel used to send back the result.
sender: oneshot::Sender<MethodResponse>,
/// The ID of the chain used to identify the chain.
chain_id: ChainId,
},
/// The RPC subscription (pub/sub) request.
Subscription {
@@ -43,24 +47,21 @@ pub enum FromSubxt {
params: String,
/// Channel used to send back the subscription ID if successful.
sub_id: oneshot::Sender<MethodResponse>,
/// Channel used to send back the notifcations.
/// Channel used to send back the notifications.
sender: mpsc::UnboundedSender<Box<RawValue>>,
/// The ID of the chain used to identify the chain.
chain_id: ChainId,
},
}
/// Background task data.
pub struct BackgroundTask {
pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
/// Smoldot light client implementation that leverages the exposed platform.
client: smoldot_light::Client<PlatformType>,
/// The ID of the chain used to identify the chain protocol (ie. substrate).
///
/// Note: A single chain is supported for a client. This aligns with the subxt's
/// vision of the Client.
chain_id: ChainId,
/// Unique ID for RPC calls.
request_id: usize,
client: smoldot_light::Client<TPlatform, TChain>,
/// Generates an unique monotonically increasing ID for each chain.
request_id_per_chain: HashMap<smoldot_light::ChainId, usize>,
/// Map the request ID of a RPC method to the frontend `Sender`.
requests: HashMap<usize, oneshot::Sender<MethodResponse>>,
requests: HashMap<(usize, smoldot_light::ChainId), oneshot::Sender<MethodResponse>>,
/// Subscription calls first need to make a plain RPC method
/// request to obtain the subscription ID.
///
@@ -68,23 +69,27 @@ pub struct BackgroundTask {
/// not be sent back to the user.
/// Map the request ID of a RPC method to the frontend `Sender`.
id_to_subscription: HashMap<
usize,
(usize, smoldot_light::ChainId),
(
oneshot::Sender<MethodResponse>,
mpsc::UnboundedSender<Box<RawValue>>,
),
>,
/// Map the subscription ID to the frontend `Sender`.
subscriptions: HashMap<usize, mpsc::UnboundedSender<Box<RawValue>>>,
///
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
subscriptions: HashMap<(usize, smoldot_light::ChainId), mpsc::UnboundedSender<Box<RawValue>>>,
}
impl BackgroundTask {
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
/// Constructs a new [`BackgroundTask`].
pub fn new(client: smoldot_light::Client<PlatformType>, chain_id: ChainId) -> BackgroundTask {
pub fn new(
client: smoldot_light::Client<TPlatform, TChain>,
) -> BackgroundTask<TPlatform, TChain> {
BackgroundTask {
client,
chain_id,
request_id: 1,
request_id_per_chain: Default::default(),
requests: Default::default(),
id_to_subscription: Default::default(),
subscriptions: Default::default(),
@@ -92,10 +97,11 @@ impl BackgroundTask {
}
/// Fetch and increment the request ID.
fn next_id(&mut self) -> usize {
let next = self.request_id;
self.request_id = self.request_id.wrapping_add(1);
next
fn next_id(&mut self, chain_id: smoldot_light::ChainId) -> usize {
let next = self.request_id_per_chain.entry(chain_id).or_insert(1);
let id = *next;
*next = next.wrapping_add(1);
id
}
/// Handle the registration messages received from the user.
@@ -105,27 +111,28 @@ impl BackgroundTask {
method,
params,
sender,
chain_id,
} => {
let id = self.next_id();
let id = self.next_id(chain_id);
let request = format!(
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
id, method, params
);
self.requests.insert(id, sender);
self.requests.insert((id, chain_id), sender);
tracing::trace!(target: LOG_TARGET, "Tracking request id={id} chain={chain_id:?}");
tracing::trace!(target: LOG_TARGET, "Generated unique id={id} for request={request}");
let result = self.client.json_rpc_request(request, self.chain_id);
let result = self.client.json_rpc_request(request, chain_id);
if let Err(err) = result {
tracing::warn!(
target: LOG_TARGET,
"Cannot send RPC request to lightclient {:?}",
err.to_string()
);
let sender = self
.requests
.remove(&id)
.remove(&(id, chain_id))
.expect("Channel is inserted above; qed");
// Send the error back to frontend.
@@ -147,20 +154,21 @@ impl BackgroundTask {
params,
sub_id,
sender,
chain_id,
} => {
// For subscriptions we need to make a plain RPC request to the subscription method.
// The server will return as a result the subscription ID.
let id = self.next_id();
let id = self.next_id(chain_id);
let request = format!(
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
id, method, params
);
self.id_to_subscription.insert(id, (sub_id, sender));
tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}");
self.id_to_subscription
.insert((id, chain_id), (sub_id, sender));
tracing::trace!(target: LOG_TARGET, "Generated unique id={id} for subscription request={request}");
let result = self.client.json_rpc_request(request, self.chain_id);
let result = self.client.json_rpc_request(request, chain_id);
if let Err(err) = result {
tracing::warn!(
target: LOG_TARGET,
@@ -169,7 +177,7 @@ impl BackgroundTask {
);
let (sub_id, _) = self
.id_to_subscription
.remove(&id)
.remove(&(id, chain_id))
.expect("Channels are inserted above; qed");
// Send the error back to frontend.
@@ -190,54 +198,57 @@ impl BackgroundTask {
}
/// Parse the response received from the light client and sent it to the appropriate user.
fn handle_rpc_response(&mut self, response: String) {
tracing::trace!(target: LOG_TARGET, "Received from smoldot response={:?}", response);
fn handle_rpc_response(&mut self, chain_id: smoldot_light::ChainId, response: String) {
tracing::trace!(target: LOG_TARGET, "Received from smoldot response={response} chain={chain_id:?}");
match RpcResponse::from_str(&response) {
Ok(RpcResponse::Error { id, error }) => {
let Ok(id) = id.parse::<usize>() else {
tracing::warn!(target: LOG_TARGET, "Cannot send error. Id={id} is not a valid number");
tracing::warn!(target: LOG_TARGET, "Cannot send error. Id={id} chain={chain_id:?} is not a valid number");
return;
};
if let Some(sender) = self.requests.remove(&id) {
if let Some(sender) = self.requests.remove(&(id, chain_id)) {
if sender
.send(Err(LightClientRpcError::Request(error.to_string())))
.is_err()
{
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id}",
"Cannot send method response to id={id} chain={chain_id:?}",
);
}
} else if let Some((sub_id_sender, _)) = self.id_to_subscription.remove(&id) {
} else if let Some((sub_id_sender, _)) =
self.id_to_subscription.remove(&(id, chain_id))
{
if sub_id_sender
.send(Err(LightClientRpcError::Request(error.to_string())))
.is_err()
{
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id {:?}",
id
"Cannot send method response to id {id} chain={chain_id:?}",
);
}
}
}
Ok(RpcResponse::Method { id, result }) => {
let Ok(id) = id.parse::<usize>() else {
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} is not a valid number");
tracing::warn!(target: LOG_TARGET, "Cannot send response. Id={id} chain={chain_id:?} is not a valid number");
return;
};
// Send the response back.
if let Some(sender) = self.requests.remove(&id) {
if let Some(sender) = self.requests.remove(&(id, chain_id)) {
if sender.send(Ok(result)).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id}",
"Cannot send method response to id={id} chain={chain_id:?}",
);
}
} else if let Some((sub_id_sender, sender)) = self.id_to_subscription.remove(&id) {
} else if let Some((sub_id_sender, sender)) =
self.id_to_subscription.remove(&(id, chain_id))
{
let Ok(sub_id) = result
.get()
.trim_start_matches('"')
@@ -246,41 +257,51 @@ impl BackgroundTask {
else {
tracing::warn!(
target: LOG_TARGET,
"Subscription id={result} is not a valid number",
"Subscription id={result} chain={chain_id:?} is not a valid number",
);
return;
};
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id}");
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
if sub_id_sender.send(Ok(result)).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id}",
"Cannot send method response to id={id} chain={chain_id:?}",
);
} else {
// Track this subscription ID if send is successful.
self.subscriptions.insert(sub_id, sender);
self.subscriptions.insert((sub_id, chain_id), sender);
}
} else {
tracing::warn!(
target: LOG_TARGET,
"Response id={id} chain={chain_id:?} is not tracked",
);
}
}
Ok(RpcResponse::Subscription { method, id, result }) => {
let Ok(id) = id.parse::<usize>() else {
tracing::warn!(target: LOG_TARGET, "Cannot send subscription. Id={id} is not a valid number");
tracing::warn!(target: LOG_TARGET, "Cannot send subscription. Id={id} chain={chain_id:?} is not a valid number");
return;
};
if let Some(sender) = self.subscriptions.get_mut(&id) {
if let Some(sender) = self.subscriptions.get_mut(&(id, chain_id)) {
// Send the current notification response.
if sender.send(result).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send notification to subscription id={id} method={method}",
"Cannot send notification to subscription id={id} chain={chain_id:?} method={method}",
);
// Remove the sender if the subscription dropped the receiver.
self.subscriptions.remove(&id);
self.subscriptions.remove(&(id, chain_id));
}
} else {
tracing::warn!(
target: LOG_TARGET,
"Subscription response id={id} chain={chain_id:?} is not tracked",
);
}
}
Err(err) => {
@@ -295,17 +316,22 @@ impl BackgroundTask {
pub async fn start_task(
&mut self,
from_subxt: mpsc::UnboundedReceiver<FromSubxt>,
from_node: smoldot_light::JsonRpcResponses,
from_node: Vec<AddedChain>,
) {
let from_subxt_event = tokio_stream::wrappers::UnboundedReceiverStream::new(from_subxt);
let from_node_event = futures_util::stream::unfold(from_node, |mut from_node| async {
from_node.next().await.map(|result| (result, from_node))
});
tokio::pin!(from_subxt_event, from_node_event);
let from_node = from_node.into_iter().map(|rpc| {
Box::pin(futures::stream::unfold(rpc, |mut rpc| async move {
let response = rpc.rpc_responses.next().await;
Some(((response, rpc.chain_id), rpc))
}))
});
let stream_combinator = futures::stream::select_all(from_node);
tokio::pin!(from_subxt_event, stream_combinator);
let mut from_subxt_event_fut = from_subxt_event.next();
let mut from_node_event_fut = from_node_event.next();
let mut from_node_event_fut = stream_combinator.next();
loop {
match future::select(from_subxt_event_fut, from_node_event_fut).await {
@@ -328,6 +354,10 @@ impl BackgroundTask {
}
// Message received from rpc handler: lightclient response.
Either::Right((node_message, previous_fut)) => {
let Some((node_message, chain)) = node_message else {
tracing::trace!(target: LOG_TARGET, "Smoldot closed all RPC channels");
break;
};
// Smoldot returns `None` if the chain has been removed (which subxt does not remove).
let Some(response) = node_message else {
tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed");
@@ -335,15 +365,15 @@ impl BackgroundTask {
};
tracing::trace!(
target: LOG_TARGET,
"Received smoldot RPC result {:?}",
response
"Received smoldot RPC chain {:?} result {:?}",
chain, response
);
self.handle_rpc_response(response);
self.handle_rpc_response(chain, response);
// Advance backend, save frontend.
from_subxt_event_fut = previous_fut;
from_node_event_fut = from_node_event.next();
from_node_event_fut = stream_combinator.next();
}
}
}