From 7762da8650aeaff829d02a6bcf82fb5c6eb3d081 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 30 Jan 2024 13:47:42 +0200 Subject: [PATCH] LightClient: Unsubscribe from subscriptions (#1408) * lightclient: Refactor the background protocol to receive unsub method Signed-off-by: Alexandru Vasile * light-client: Unsubscribe if the user dropped the subscription Signed-off-by: Alexandru Vasile * lightclient: Refactor background task for borrow-checker Signed-off-by: Alexandru Vasile * lightclient/rpc: Pass the unsub method name to the background task Signed-off-by: Alexandru Vasile * lightclient: Unsubscribe with subscription ID as param Signed-off-by: Alexandru Vasile * lightclient: Rename subscription states Signed-off-by: Alexandru Vasile --------- Signed-off-by: Alexandru Vasile --- lightclient/src/background.rs | 113 ++++++++++++++++++++------- lightclient/src/client.rs | 2 + subxt/src/client/light_client/rpc.rs | 4 +- 3 files changed, 87 insertions(+), 32 deletions(-) diff --git a/lightclient/src/background.rs b/lightclient/src/background.rs index 1e34596d9e..01614d202e 100644 --- a/lightclient/src/background.rs +++ b/lightclient/src/background.rs @@ -43,6 +43,8 @@ pub enum FromSubxt { Subscription { /// The method of the request. method: String, + /// The method to unsubscribe. + unsubscribe_method: String, /// The parameters of the request. params: String, /// Channel used to send back the subscription ID if successful. @@ -69,18 +71,40 @@ pub struct BackgroundTask { /// The RPC method request is made in the background and the response should /// not be sent back to the user. /// Map the request ID of a RPC method to the frontend `Sender`. - id_to_subscription: HashMap< - (usize, smoldot_light::ChainId), - ( - oneshot::Sender, - mpsc::UnboundedSender>, - ), - >, + id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>, /// Map the subscription ID to the frontend `Sender`. /// /// The subscription ID is entirely generated by the node (smoldot). Therefore, it is /// possible for two distinct subscriptions of different chains to have the same subscription ID. - subscriptions: HashMap<(usize, smoldot_light::ChainId), mpsc::UnboundedSender>>, + subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>, +} + +/// The state needed to resolve the subscription ID and send +/// back the response to frontend. +struct PendingSubscription { + /// Send the method response ID back to the user. + /// + /// It contains the subscription ID if successful, or an JSON RPC error object. + sub_id_sender: oneshot::Sender, + /// The subscription state that is added to the `subscriptions` map only + /// if the subscription ID is successfully sent back to the user. + subscription_state: ActiveSubscription, +} + +impl PendingSubscription { + /// Transforms the pending subscription into an active subscription. + fn into_parts(self) -> (oneshot::Sender, ActiveSubscription) { + (self.sub_id_sender, self.subscription_state) + } +} + +/// The state of the subscription. +struct ActiveSubscription { + /// Channel to send the subscription notifications back to frontend. + sender: mpsc::UnboundedSender>, + /// The unsubscribe method to call when the user drops the receiver + /// part of the channel. + unsubscribe_method: String, } impl BackgroundTask { @@ -152,6 +176,7 @@ impl BackgroundTask { } FromSubxt::Subscription { method, + unsubscribe_method, params, sub_id, sender, @@ -166,8 +191,15 @@ impl BackgroundTask { ); tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}"); + let subscription_id_state = PendingSubscription { + sub_id_sender: sub_id, + subscription_state: ActiveSubscription { + sender, + unsubscribe_method, + }, + }; self.id_to_subscription - .insert((id, chain_id), (sub_id, sender)); + .insert((id, chain_id), subscription_id_state); let result = self.client.json_rpc_request(request, chain_id); if let Err(err) = result { @@ -176,13 +208,14 @@ impl BackgroundTask { "Cannot send RPC request to lightclient {:?}", err.to_string() ); - let (sub_id, _) = self + let subscription_id_state = self .id_to_subscription .remove(&(id, chain_id)) .expect("Channels are inserted above; qed"); // Send the error back to frontend. - if sub_id + if subscription_id_state + .sub_id_sender .send(Err(LightClientRpcError::Request(err.to_string()))) .is_err() { @@ -219,10 +252,11 @@ impl BackgroundTask { "Cannot send method response to id={id} chain={chain_id:?}", ); } - } else if let Some((sub_id_sender, _)) = + } else if let Some(subscription_id_state) = self.id_to_subscription.remove(&(id, chain_id)) { - if sub_id_sender + if subscription_id_state + .sub_id_sender .send(Err(LightClientRpcError::Request(error.to_string()))) .is_err() { @@ -247,7 +281,7 @@ impl BackgroundTask { "Cannot send method response to id={id} chain={chain_id:?}", ); } - } else if let Some((sub_id_sender, sender)) = + } else if let Some(pending_subscription) = self.id_to_subscription.remove(&(id, chain_id)) { let Ok(sub_id) = result @@ -265,15 +299,19 @@ impl BackgroundTask { tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}"); + let (sub_id_sender, active_subscription) = pending_subscription.into_parts(); if sub_id_sender.send(Ok(result)).is_err() { tracing::warn!( target: LOG_TARGET, "Cannot send method response to id={id} chain={chain_id:?}", ); - } else { - // Track this subscription ID if send is successful. - self.subscriptions.insert((sub_id, chain_id), sender); + + return; } + + // Track this subscription ID if send is successful. + self.subscriptions + .insert((sub_id, chain_id), active_subscription); } else { tracing::warn!( target: LOG_TARGET, @@ -287,22 +325,37 @@ impl BackgroundTask { return; }; - if let Some(sender) = self.subscriptions.get_mut(&(id, chain_id)) { - // Send the current notification response. - if sender.send(result).is_err() { - tracing::warn!( - target: LOG_TARGET, - "Cannot send notification to subscription id={id} chain={chain_id:?} method={method}", - ); - - // Remove the sender if the subscription dropped the receiver. - self.subscriptions.remove(&(id, chain_id)); - } - } else { + let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else { tracing::warn!( target: LOG_TARGET, - "Subscription response id={id} chain={chain_id:?} is not tracked", + "Subscription response id={id} chain={chain_id:?} method={method} is not tracked", ); + return; + }; + if subscription_state.sender.send(result).is_ok() { + // Nothing else to do, user is informed about the notification. + return; + } + + // User dropped the receiver, unsubscribe from the method and remove internal tracking. + let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else { + // State is checked to be some above, so this should never happen. + return; + }; + // Make a call to unsubscribe from this method. + let unsub_id = self.next_id(chain_id); + let request = format!( + r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#, + unsub_id, subscription_state.unsubscribe_method, id + ); + + if let Err(err) = self.client.json_rpc_request(request, chain_id) { + tracing::warn!( + target: LOG_TARGET, + "Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method + ); + } else { + tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method); } } Err(err) => { diff --git a/lightclient/src/client.rs b/lightclient/src/client.rs index 0269986384..f06c968b66 100644 --- a/lightclient/src/client.rs +++ b/lightclient/src/client.rs @@ -179,6 +179,7 @@ impl LightClientRpc { &self, method: String, params: String, + unsubscribe_method: String, ) -> Result< ( oneshot::Receiver, @@ -191,6 +192,7 @@ impl LightClientRpc { self.to_backend.send(FromSubxt::Subscription { method, + unsubscribe_method, params, sub_id, sender, diff --git a/subxt/src/client/light_client/rpc.rs b/subxt/src/client/light_client/rpc.rs index 1186528ec6..ca4fb2f664 100644 --- a/subxt/src/client/light_client/rpc.rs +++ b/subxt/src/client/light_client/rpc.rs @@ -106,7 +106,7 @@ impl RpcClientT for LightClientRpc { &'a self, sub: &'a str, params: Option>, - _unsub: &'a str, + unsub: &'a str, ) -> RawRpcFuture<'a, RawRpcSubscription> { let client = self.clone(); let chain_id = self.chain_id(); @@ -130,7 +130,7 @@ impl RpcClientT for LightClientRpc { // Fails if the background is closed. let (sub_id, notif) = client .0 - .subscription_request(sub.to_string(), params) + .subscription_request(sub.to_string(), params, unsub.to_string()) .map_err(|_| RpcError::ClientError(Box::new(LightClientError::BackgroundClosed)))?; // Fails if the background is closed.