LightClient: Unsubscribe from subscriptions (#1408)

* lightclient: Refactor the background protocol to receive unsub method

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* light-client: Unsubscribe if the user dropped the subscription

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Refactor background task for borrow-checker

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient/rpc: Pass the unsub method name to the background task

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Unsubscribe with subscription ID as param

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

* lightclient: Rename subscription states

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2024-01-30 13:47:42 +02:00
committed by GitHub
parent 0897459928
commit 7762da8650
3 changed files with 87 additions and 32 deletions
+83 -30
View File
@@ -43,6 +43,8 @@ pub enum FromSubxt {
Subscription {
/// The method of the request.
method: String,
/// The method to unsubscribe.
unsubscribe_method: String,
/// The parameters of the request.
params: String,
/// Channel used to send back the subscription ID if successful.
@@ -69,18 +71,40 @@ pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
/// The RPC method request is made in the background and the response should
/// not be sent back to the user.
/// Map the request ID of a RPC method to the frontend `Sender`.
id_to_subscription: HashMap<
(usize, smoldot_light::ChainId),
(
oneshot::Sender<MethodResponse>,
mpsc::UnboundedSender<Box<RawValue>>,
),
>,
id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>,
/// Map the subscription ID to the frontend `Sender`.
///
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
subscriptions: HashMap<(usize, smoldot_light::ChainId), mpsc::UnboundedSender<Box<RawValue>>>,
subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>,
}
/// The state needed to resolve the subscription ID and send
/// back the response to frontend.
struct PendingSubscription {
/// Send the method response ID back to the user.
///
/// It contains the subscription ID if successful, or an JSON RPC error object.
sub_id_sender: oneshot::Sender<MethodResponse>,
/// The subscription state that is added to the `subscriptions` map only
/// if the subscription ID is successfully sent back to the user.
subscription_state: ActiveSubscription,
}
impl PendingSubscription {
/// Transforms the pending subscription into an active subscription.
fn into_parts(self) -> (oneshot::Sender<MethodResponse>, ActiveSubscription) {
(self.sub_id_sender, self.subscription_state)
}
}
/// The state of the subscription.
struct ActiveSubscription {
/// Channel to send the subscription notifications back to frontend.
sender: mpsc::UnboundedSender<Box<RawValue>>,
/// The unsubscribe method to call when the user drops the receiver
/// part of the channel.
unsubscribe_method: String,
}
impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
@@ -152,6 +176,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
}
FromSubxt::Subscription {
method,
unsubscribe_method,
params,
sub_id,
sender,
@@ -166,8 +191,15 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
);
tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}");
let subscription_id_state = PendingSubscription {
sub_id_sender: sub_id,
subscription_state: ActiveSubscription {
sender,
unsubscribe_method,
},
};
self.id_to_subscription
.insert((id, chain_id), (sub_id, sender));
.insert((id, chain_id), subscription_id_state);
let result = self.client.json_rpc_request(request, chain_id);
if let Err(err) = result {
@@ -176,13 +208,14 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
"Cannot send RPC request to lightclient {:?}",
err.to_string()
);
let (sub_id, _) = self
let subscription_id_state = self
.id_to_subscription
.remove(&(id, chain_id))
.expect("Channels are inserted above; qed");
// Send the error back to frontend.
if sub_id
if subscription_id_state
.sub_id_sender
.send(Err(LightClientRpcError::Request(err.to_string())))
.is_err()
{
@@ -219,10 +252,11 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
"Cannot send method response to id={id} chain={chain_id:?}",
);
}
} else if let Some((sub_id_sender, _)) =
} else if let Some(subscription_id_state) =
self.id_to_subscription.remove(&(id, chain_id))
{
if sub_id_sender
if subscription_id_state
.sub_id_sender
.send(Err(LightClientRpcError::Request(error.to_string())))
.is_err()
{
@@ -247,7 +281,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
"Cannot send method response to id={id} chain={chain_id:?}",
);
}
} else if let Some((sub_id_sender, sender)) =
} else if let Some(pending_subscription) =
self.id_to_subscription.remove(&(id, chain_id))
{
let Ok(sub_id) = result
@@ -265,15 +299,19 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}");
let (sub_id_sender, active_subscription) = pending_subscription.into_parts();
if sub_id_sender.send(Ok(result)).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send method response to id={id} chain={chain_id:?}",
);
} else {
// Track this subscription ID if send is successful.
self.subscriptions.insert((sub_id, chain_id), sender);
return;
}
// Track this subscription ID if send is successful.
self.subscriptions
.insert((sub_id, chain_id), active_subscription);
} else {
tracing::warn!(
target: LOG_TARGET,
@@ -287,22 +325,37 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
return;
};
if let Some(sender) = self.subscriptions.get_mut(&(id, chain_id)) {
// Send the current notification response.
if sender.send(result).is_err() {
tracing::warn!(
target: LOG_TARGET,
"Cannot send notification to subscription id={id} chain={chain_id:?} method={method}",
);
// Remove the sender if the subscription dropped the receiver.
self.subscriptions.remove(&(id, chain_id));
}
} else {
let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else {
tracing::warn!(
target: LOG_TARGET,
"Subscription response id={id} chain={chain_id:?} is not tracked",
"Subscription response id={id} chain={chain_id:?} method={method} is not tracked",
);
return;
};
if subscription_state.sender.send(result).is_ok() {
// Nothing else to do, user is informed about the notification.
return;
}
// User dropped the receiver, unsubscribe from the method and remove internal tracking.
let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else {
// State is checked to be some above, so this should never happen.
return;
};
// Make a call to unsubscribe from this method.
let unsub_id = self.next_id(chain_id);
let request = format!(
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
unsub_id, subscription_state.unsubscribe_method, id
);
if let Err(err) = self.client.json_rpc_request(request, chain_id) {
tracing::warn!(
target: LOG_TARGET,
"Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method
);
} else {
tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method);
}
}
Err(err) => {
+2
View File
@@ -179,6 +179,7 @@ impl LightClientRpc {
&self,
method: String,
params: String,
unsubscribe_method: String,
) -> Result<
(
oneshot::Receiver<MethodResponse>,
@@ -191,6 +192,7 @@ impl LightClientRpc {
self.to_backend.send(FromSubxt::Subscription {
method,
unsubscribe_method,
params,
sub_id,
sender,
+2 -2
View File
@@ -106,7 +106,7 @@ impl RpcClientT for LightClientRpc {
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
_unsub: &'a str,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
let client = self.clone();
let chain_id = self.chain_id();
@@ -130,7 +130,7 @@ impl RpcClientT for LightClientRpc {
// Fails if the background is closed.
let (sub_id, notif) = client
.0
.subscription_request(sub.to_string(), params)
.subscription_request(sub.to_string(), params, unsub.to_string())
.map_err(|_| RpcError::ClientError(Box::new(LightClientError::BackgroundClosed)))?;
// Fails if the background is closed.