From d96046cda6d0155b3859b15fd59cfa57482f639c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 29 Jan 2024 09:11:51 +0100 Subject: [PATCH 1/4] build(deps): bump serde from 1.0.195 to 1.0.196 (#1406) Bumps [serde](https://github.com/serde-rs/serde) from 1.0.195 to 1.0.196. - [Release notes](https://github.com/serde-rs/serde/releases) - [Commits](https://github.com/serde-rs/serde/compare/v1.0.195...v1.0.196) --- updated-dependencies: - dependency-name: serde dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7475c4a557..d4bc7da3e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3700,9 +3700,9 @@ checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" [[package]] name = "serde" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" dependencies = [ "serde_derive", ] @@ -3718,9 +3718,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 1a5a07ade5..66f30759a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ scale-value = "0.13.0" scale-bits = "0.4.0" scale-decode = "0.10.0" scale-encode = "0.5.0" -serde = { version = "1.0.195" } +serde = { version = "1.0.196" } serde_json = { version = "1.0.108" } syn = { version = "2.0.15", features = ["full", "extra-traits"] } thiserror = "1.0.53" From e7e9b5720455209752462e7b028b7042f4d833f1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 29 Jan 2024 09:13:01 +0100 Subject: [PATCH 2/4] build(deps): bump pin-project from 1.1.3 to 1.1.4 (#1404) Bumps [pin-project](https://github.com/taiki-e/pin-project) from 1.1.3 to 1.1.4. - [Release notes](https://github.com/taiki-e/pin-project/releases) - [Changelog](https://github.com/taiki-e/pin-project/blob/main/CHANGELOG.md) - [Commits](https://github.com/taiki-e/pin-project/compare/v1.1.3...v1.1.4) --- updated-dependencies: - dependency-name: pin-project dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 8 ++++---- Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d4bc7da3e8..892397e0c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2865,18 +2865,18 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 66f30759a8..dca3d5a3f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,7 @@ smoldot-light = { version = "0.14.0", default-features = false } tokio-stream = "0.1.14" futures-util = "0.3.30" rand = "0.8.5" -pin-project = "1.1.3" +pin-project = "1.1.4" # Light client wasm: web-sys = { version = "0.3.67", features = ["BinaryType", "CloseEvent", "MessageEvent", "WebSocket"] } From 0897459928cd77aba5f8f90442f67f3a2271f1be Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 29 Jan 2024 11:40:08 +0200 Subject: [PATCH 3/4] build(deps): bump serde_json from 1.0.111 to 1.0.113 (#1407) Bumps [serde_json](https://github.com/serde-rs/json) from 1.0.111 to 1.0.113. - [Release notes](https://github.com/serde-rs/json/releases) - [Commits](https://github.com/serde-rs/json/compare/v1.0.111...v1.0.113) --- updated-dependencies: - dependency-name: serde_json dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 892397e0c6..7f72441dda 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3729,9 +3729,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.111" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index dca3d5a3f4..16bfeb416a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,7 +84,7 @@ scale-bits = "0.4.0" scale-decode = "0.10.0" scale-encode = "0.5.0" serde = { version = "1.0.196" } -serde_json = { version = "1.0.108" } +serde_json = { version = "1.0.113" } syn = { version = "2.0.15", features = ["full", "extra-traits"] } thiserror = "1.0.53" tokio = { version = "1.35", default-features = false } From 7762da8650aeaff829d02a6bcf82fb5c6eb3d081 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 30 Jan 2024 13:47:42 +0200 Subject: [PATCH 4/4] LightClient: Unsubscribe from subscriptions (#1408) * lightclient: Refactor the background protocol to receive unsub method Signed-off-by: Alexandru Vasile * light-client: Unsubscribe if the user dropped the subscription Signed-off-by: Alexandru Vasile * lightclient: Refactor background task for borrow-checker Signed-off-by: Alexandru Vasile * lightclient/rpc: Pass the unsub method name to the background task Signed-off-by: Alexandru Vasile * lightclient: Unsubscribe with subscription ID as param Signed-off-by: Alexandru Vasile * lightclient: Rename subscription states Signed-off-by: Alexandru Vasile --------- Signed-off-by: Alexandru Vasile --- lightclient/src/background.rs | 113 ++++++++++++++++++++------- lightclient/src/client.rs | 2 + subxt/src/client/light_client/rpc.rs | 4 +- 3 files changed, 87 insertions(+), 32 deletions(-) diff --git a/lightclient/src/background.rs b/lightclient/src/background.rs index 1e34596d9e..01614d202e 100644 --- a/lightclient/src/background.rs +++ b/lightclient/src/background.rs @@ -43,6 +43,8 @@ pub enum FromSubxt { Subscription { /// The method of the request. method: String, + /// The method to unsubscribe. + unsubscribe_method: String, /// The parameters of the request. params: String, /// Channel used to send back the subscription ID if successful. @@ -69,18 +71,40 @@ pub struct BackgroundTask { /// The RPC method request is made in the background and the response should /// not be sent back to the user. /// Map the request ID of a RPC method to the frontend `Sender`. - id_to_subscription: HashMap< - (usize, smoldot_light::ChainId), - ( - oneshot::Sender, - mpsc::UnboundedSender>, - ), - >, + id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>, /// Map the subscription ID to the frontend `Sender`. /// /// The subscription ID is entirely generated by the node (smoldot). Therefore, it is /// possible for two distinct subscriptions of different chains to have the same subscription ID. - subscriptions: HashMap<(usize, smoldot_light::ChainId), mpsc::UnboundedSender>>, + subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>, +} + +/// The state needed to resolve the subscription ID and send +/// back the response to frontend. +struct PendingSubscription { + /// Send the method response ID back to the user. + /// + /// It contains the subscription ID if successful, or an JSON RPC error object. + sub_id_sender: oneshot::Sender, + /// The subscription state that is added to the `subscriptions` map only + /// if the subscription ID is successfully sent back to the user. + subscription_state: ActiveSubscription, +} + +impl PendingSubscription { + /// Transforms the pending subscription into an active subscription. + fn into_parts(self) -> (oneshot::Sender, ActiveSubscription) { + (self.sub_id_sender, self.subscription_state) + } +} + +/// The state of the subscription. +struct ActiveSubscription { + /// Channel to send the subscription notifications back to frontend. + sender: mpsc::UnboundedSender>, + /// The unsubscribe method to call when the user drops the receiver + /// part of the channel. + unsubscribe_method: String, } impl BackgroundTask { @@ -152,6 +176,7 @@ impl BackgroundTask { } FromSubxt::Subscription { method, + unsubscribe_method, params, sub_id, sender, @@ -166,8 +191,15 @@ impl BackgroundTask { ); tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}"); + let subscription_id_state = PendingSubscription { + sub_id_sender: sub_id, + subscription_state: ActiveSubscription { + sender, + unsubscribe_method, + }, + }; self.id_to_subscription - .insert((id, chain_id), (sub_id, sender)); + .insert((id, chain_id), subscription_id_state); let result = self.client.json_rpc_request(request, chain_id); if let Err(err) = result { @@ -176,13 +208,14 @@ impl BackgroundTask { "Cannot send RPC request to lightclient {:?}", err.to_string() ); - let (sub_id, _) = self + let subscription_id_state = self .id_to_subscription .remove(&(id, chain_id)) .expect("Channels are inserted above; qed"); // Send the error back to frontend. - if sub_id + if subscription_id_state + .sub_id_sender .send(Err(LightClientRpcError::Request(err.to_string()))) .is_err() { @@ -219,10 +252,11 @@ impl BackgroundTask { "Cannot send method response to id={id} chain={chain_id:?}", ); } - } else if let Some((sub_id_sender, _)) = + } else if let Some(subscription_id_state) = self.id_to_subscription.remove(&(id, chain_id)) { - if sub_id_sender + if subscription_id_state + .sub_id_sender .send(Err(LightClientRpcError::Request(error.to_string()))) .is_err() { @@ -247,7 +281,7 @@ impl BackgroundTask { "Cannot send method response to id={id} chain={chain_id:?}", ); } - } else if let Some((sub_id_sender, sender)) = + } else if let Some(pending_subscription) = self.id_to_subscription.remove(&(id, chain_id)) { let Ok(sub_id) = result @@ -265,15 +299,19 @@ impl BackgroundTask { tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}"); + let (sub_id_sender, active_subscription) = pending_subscription.into_parts(); if sub_id_sender.send(Ok(result)).is_err() { tracing::warn!( target: LOG_TARGET, "Cannot send method response to id={id} chain={chain_id:?}", ); - } else { - // Track this subscription ID if send is successful. - self.subscriptions.insert((sub_id, chain_id), sender); + + return; } + + // Track this subscription ID if send is successful. + self.subscriptions + .insert((sub_id, chain_id), active_subscription); } else { tracing::warn!( target: LOG_TARGET, @@ -287,22 +325,37 @@ impl BackgroundTask { return; }; - if let Some(sender) = self.subscriptions.get_mut(&(id, chain_id)) { - // Send the current notification response. - if sender.send(result).is_err() { - tracing::warn!( - target: LOG_TARGET, - "Cannot send notification to subscription id={id} chain={chain_id:?} method={method}", - ); - - // Remove the sender if the subscription dropped the receiver. - self.subscriptions.remove(&(id, chain_id)); - } - } else { + let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else { tracing::warn!( target: LOG_TARGET, - "Subscription response id={id} chain={chain_id:?} is not tracked", + "Subscription response id={id} chain={chain_id:?} method={method} is not tracked", ); + return; + }; + if subscription_state.sender.send(result).is_ok() { + // Nothing else to do, user is informed about the notification. + return; + } + + // User dropped the receiver, unsubscribe from the method and remove internal tracking. + let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else { + // State is checked to be some above, so this should never happen. + return; + }; + // Make a call to unsubscribe from this method. + let unsub_id = self.next_id(chain_id); + let request = format!( + r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#, + unsub_id, subscription_state.unsubscribe_method, id + ); + + if let Err(err) = self.client.json_rpc_request(request, chain_id) { + tracing::warn!( + target: LOG_TARGET, + "Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method + ); + } else { + tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method); } } Err(err) => { diff --git a/lightclient/src/client.rs b/lightclient/src/client.rs index 0269986384..f06c968b66 100644 --- a/lightclient/src/client.rs +++ b/lightclient/src/client.rs @@ -179,6 +179,7 @@ impl LightClientRpc { &self, method: String, params: String, + unsubscribe_method: String, ) -> Result< ( oneshot::Receiver, @@ -191,6 +192,7 @@ impl LightClientRpc { self.to_backend.send(FromSubxt::Subscription { method, + unsubscribe_method, params, sub_id, sender, diff --git a/subxt/src/client/light_client/rpc.rs b/subxt/src/client/light_client/rpc.rs index 1186528ec6..ca4fb2f664 100644 --- a/subxt/src/client/light_client/rpc.rs +++ b/subxt/src/client/light_client/rpc.rs @@ -106,7 +106,7 @@ impl RpcClientT for LightClientRpc { &'a self, sub: &'a str, params: Option>, - _unsub: &'a str, + unsub: &'a str, ) -> RawRpcFuture<'a, RawRpcSubscription> { let client = self.clone(); let chain_id = self.chain_id(); @@ -130,7 +130,7 @@ impl RpcClientT for LightClientRpc { // Fails if the background is closed. let (sub_id, notif) = client .0 - .subscription_request(sub.to_string(), params) + .subscription_request(sub.to_string(), params, unsub.to_string()) .map_err(|_| RpcError::ClientError(Box::new(LightClientError::BackgroundClosed)))?; // Fails if the background is closed.