mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-13 05:51:02 +00:00
Merge remote-tracking branch 'origin/master' into lexnv/update-smoldot
This commit is contained in:
Generated
+10
-10
@@ -2882,18 +2882,18 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||
|
||||
[[package]]
|
||||
name = "pin-project"
|
||||
version = "1.1.3"
|
||||
version = "1.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422"
|
||||
checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0"
|
||||
dependencies = [
|
||||
"pin-project-internal",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pin-project-internal"
|
||||
version = "1.1.3"
|
||||
version = "1.1.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
|
||||
checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -3717,9 +3717,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.195"
|
||||
version = "1.0.196"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02"
|
||||
checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
@@ -3735,9 +3735,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.195"
|
||||
version = "1.0.196"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c"
|
||||
checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -3746,9 +3746,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "serde_json"
|
||||
version = "1.0.111"
|
||||
version = "1.0.113"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4"
|
||||
checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79"
|
||||
dependencies = [
|
||||
"itoa",
|
||||
"ryu",
|
||||
|
||||
+3
-3
@@ -83,8 +83,8 @@ scale-value = "0.13.0"
|
||||
scale-bits = "0.4.0"
|
||||
scale-decode = "0.10.0"
|
||||
scale-encode = "0.5.0"
|
||||
serde = { version = "1.0.195" }
|
||||
serde_json = { version = "1.0.108" }
|
||||
serde = { version = "1.0.196" }
|
||||
serde_json = { version = "1.0.113" }
|
||||
syn = { version = "2.0.15", features = ["full", "extra-traits"] }
|
||||
thiserror = "1.0.53"
|
||||
tokio = { version = "1.35", default-features = false }
|
||||
@@ -106,7 +106,7 @@ smoldot-light = { version = "0.15.0", default-features = false }
|
||||
tokio-stream = "0.1.14"
|
||||
futures-util = "0.3.30"
|
||||
rand = "0.8.5"
|
||||
pin-project = "1.1.3"
|
||||
pin-project = "1.1.4"
|
||||
|
||||
# Light client wasm:
|
||||
web-sys = { version = "0.3.67", features = ["BinaryType", "CloseEvent", "MessageEvent", "WebSocket"] }
|
||||
|
||||
@@ -43,6 +43,8 @@ pub enum FromSubxt {
|
||||
Subscription {
|
||||
/// The method of the request.
|
||||
method: String,
|
||||
/// The method to unsubscribe.
|
||||
unsubscribe_method: String,
|
||||
/// The parameters of the request.
|
||||
params: String,
|
||||
/// Channel used to send back the subscription ID if successful.
|
||||
@@ -69,18 +71,40 @@ pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
|
||||
/// The RPC method request is made in the background and the response should
|
||||
/// not be sent back to the user.
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
id_to_subscription: HashMap<
|
||||
(usize, smoldot_light::ChainId),
|
||||
(
|
||||
oneshot::Sender<MethodResponse>,
|
||||
mpsc::UnboundedSender<Box<RawValue>>,
|
||||
),
|
||||
>,
|
||||
id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>,
|
||||
/// Map the subscription ID to the frontend `Sender`.
|
||||
///
|
||||
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
|
||||
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
|
||||
subscriptions: HashMap<(usize, smoldot_light::ChainId), mpsc::UnboundedSender<Box<RawValue>>>,
|
||||
subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>,
|
||||
}
|
||||
|
||||
/// The state needed to resolve the subscription ID and send
|
||||
/// back the response to frontend.
|
||||
struct PendingSubscription {
|
||||
/// Send the method response ID back to the user.
|
||||
///
|
||||
/// It contains the subscription ID if successful, or an JSON RPC error object.
|
||||
sub_id_sender: oneshot::Sender<MethodResponse>,
|
||||
/// The subscription state that is added to the `subscriptions` map only
|
||||
/// if the subscription ID is successfully sent back to the user.
|
||||
subscription_state: ActiveSubscription,
|
||||
}
|
||||
|
||||
impl PendingSubscription {
|
||||
/// Transforms the pending subscription into an active subscription.
|
||||
fn into_parts(self) -> (oneshot::Sender<MethodResponse>, ActiveSubscription) {
|
||||
(self.sub_id_sender, self.subscription_state)
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of the subscription.
|
||||
struct ActiveSubscription {
|
||||
/// Channel to send the subscription notifications back to frontend.
|
||||
sender: mpsc::UnboundedSender<Box<RawValue>>,
|
||||
/// The unsubscribe method to call when the user drops the receiver
|
||||
/// part of the channel.
|
||||
unsubscribe_method: String,
|
||||
}
|
||||
|
||||
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
@@ -152,6 +176,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
FromSubxt::Subscription {
|
||||
method,
|
||||
unsubscribe_method,
|
||||
params,
|
||||
sub_id,
|
||||
sender,
|
||||
@@ -166,8 +191,15 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
);
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}");
|
||||
let subscription_id_state = PendingSubscription {
|
||||
sub_id_sender: sub_id,
|
||||
subscription_state: ActiveSubscription {
|
||||
sender,
|
||||
unsubscribe_method,
|
||||
},
|
||||
};
|
||||
self.id_to_subscription
|
||||
.insert((id, chain_id), (sub_id, sender));
|
||||
.insert((id, chain_id), subscription_id_state);
|
||||
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
@@ -176,13 +208,14 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
"Cannot send RPC request to lightclient {:?}",
|
||||
err.to_string()
|
||||
);
|
||||
let (sub_id, _) = self
|
||||
let subscription_id_state = self
|
||||
.id_to_subscription
|
||||
.remove(&(id, chain_id))
|
||||
.expect("Channels are inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
if sub_id
|
||||
if subscription_id_state
|
||||
.sub_id_sender
|
||||
.send(Err(LightClientRpcError::Request(err.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
@@ -219,10 +252,11 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some((sub_id_sender, _)) =
|
||||
} else if let Some(subscription_id_state) =
|
||||
self.id_to_subscription.remove(&(id, chain_id))
|
||||
{
|
||||
if sub_id_sender
|
||||
if subscription_id_state
|
||||
.sub_id_sender
|
||||
.send(Err(LightClientRpcError::Request(error.to_string())))
|
||||
.is_err()
|
||||
{
|
||||
@@ -247,7 +281,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some((sub_id_sender, sender)) =
|
||||
} else if let Some(pending_subscription) =
|
||||
self.id_to_subscription.remove(&(id, chain_id))
|
||||
{
|
||||
let Ok(sub_id) = result
|
||||
@@ -265,15 +299,19 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
|
||||
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
|
||||
|
||||
let (sub_id_sender, active_subscription) = pending_subscription.into_parts();
|
||||
if sub_id_sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
} else {
|
||||
// Track this subscription ID if send is successful.
|
||||
self.subscriptions.insert((sub_id, chain_id), sender);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Track this subscription ID if send is successful.
|
||||
self.subscriptions
|
||||
.insert((sub_id, chain_id), active_subscription);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -287,22 +325,37 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(sender) = self.subscriptions.get_mut(&(id, chain_id)) {
|
||||
// Send the current notification response.
|
||||
if sender.send(result).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send notification to subscription id={id} chain={chain_id:?} method={method}",
|
||||
);
|
||||
|
||||
// Remove the sender if the subscription dropped the receiver.
|
||||
self.subscriptions.remove(&(id, chain_id));
|
||||
}
|
||||
} else {
|
||||
let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription response id={id} chain={chain_id:?} is not tracked",
|
||||
"Subscription response id={id} chain={chain_id:?} method={method} is not tracked",
|
||||
);
|
||||
return;
|
||||
};
|
||||
if subscription_state.sender.send(result).is_ok() {
|
||||
// Nothing else to do, user is informed about the notification.
|
||||
return;
|
||||
}
|
||||
|
||||
// User dropped the receiver, unsubscribe from the method and remove internal tracking.
|
||||
let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else {
|
||||
// State is checked to be some above, so this should never happen.
|
||||
return;
|
||||
};
|
||||
// Make a call to unsubscribe from this method.
|
||||
let unsub_id = self.next_id(chain_id);
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
|
||||
unsub_id, subscription_state.unsubscribe_method, id
|
||||
);
|
||||
|
||||
if let Err(err) = self.client.json_rpc_request(request, chain_id) {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
|
||||
@@ -179,6 +179,7 @@ impl LightClientRpc {
|
||||
&self,
|
||||
method: String,
|
||||
params: String,
|
||||
unsubscribe_method: String,
|
||||
) -> Result<
|
||||
(
|
||||
oneshot::Receiver<MethodResponse>,
|
||||
@@ -191,6 +192,7 @@ impl LightClientRpc {
|
||||
|
||||
self.to_backend.send(FromSubxt::Subscription {
|
||||
method,
|
||||
unsubscribe_method,
|
||||
params,
|
||||
sub_id,
|
||||
sender,
|
||||
|
||||
@@ -106,7 +106,7 @@ impl RpcClientT for LightClientRpc {
|
||||
&'a self,
|
||||
sub: &'a str,
|
||||
params: Option<Box<RawValue>>,
|
||||
_unsub: &'a str,
|
||||
unsub: &'a str,
|
||||
) -> RawRpcFuture<'a, RawRpcSubscription> {
|
||||
let client = self.clone();
|
||||
let chain_id = self.chain_id();
|
||||
@@ -130,7 +130,7 @@ impl RpcClientT for LightClientRpc {
|
||||
// Fails if the background is closed.
|
||||
let (sub_id, notif) = client
|
||||
.0
|
||||
.subscription_request(sub.to_string(), params)
|
||||
.subscription_request(sub.to_string(), params, unsub.to_string())
|
||||
.map_err(|_| RpcError::ClientError(Box::new(LightClientError::BackgroundClosed)))?;
|
||||
|
||||
// Fails if the background is closed.
|
||||
|
||||
Reference in New Issue
Block a user