diff --git a/lightclient/src/background.rs b/lightclient/src/background.rs index 01614d202e..b755b383b7 100644 --- a/lightclient/src/background.rs +++ b/lightclient/src/background.rs @@ -61,22 +61,37 @@ pub enum FromSubxt { pub struct BackgroundTask { /// Smoldot light client implementation that leverages the exposed platform. client: smoldot_light::Client, + /// Per-chain data. + chain_data: HashMap, +} + +/// The data that we store for each chain. +#[derive(Default)] +struct ChainData { /// Generates an unique monotonically increasing ID for each chain. - request_id_per_chain: HashMap, + last_request_id: usize, /// Map the request ID of a RPC method to the frontend `Sender`. - requests: HashMap<(usize, smoldot_light::ChainId), oneshot::Sender>, + requests: HashMap>, /// Subscription calls first need to make a plain RPC method /// request to obtain the subscription ID. /// /// The RPC method request is made in the background and the response should /// not be sent back to the user. /// Map the request ID of a RPC method to the frontend `Sender`. - id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>, + id_to_subscription: HashMap, /// Map the subscription ID to the frontend `Sender`. /// /// The subscription ID is entirely generated by the node (smoldot). Therefore, it is /// possible for two distinct subscriptions of different chains to have the same subscription ID. - subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>, + subscriptions: HashMap, +} + +impl ChainData { + /// Fetch and increment the request ID. + fn next_id(&mut self) -> usize { + self.last_request_id = self.last_request_id.wrapping_add(1); + self.last_request_id + } } /// The state needed to resolve the subscription ID and send @@ -114,19 +129,20 @@ impl BackgroundTask { ) -> BackgroundTask { BackgroundTask { client, - request_id_per_chain: Default::default(), - requests: Default::default(), - id_to_subscription: Default::default(), - subscriptions: Default::default(), + chain_data: Default::default(), } } - /// Fetch and increment the request ID. - fn next_id(&mut self, chain_id: smoldot_light::ChainId) -> usize { - let next = self.request_id_per_chain.entry(chain_id).or_insert(1); - let id = *next; - *next = next.wrapping_add(1); - id + fn for_chain_id( + &mut self, + chain_id: smoldot_light::ChainId, + ) -> ( + &mut ChainData, + &mut smoldot_light::Client, + ) { + let chain_data = self.chain_data.entry(chain_id).or_default(); + let client = &mut self.client; + (chain_data, client) } /// Handle the registration messages received from the user. @@ -138,16 +154,18 @@ impl BackgroundTask { sender, chain_id, } => { - let id = self.next_id(chain_id); + let (chain_data, client) = self.for_chain_id(chain_id); + let id = chain_data.next_id(); + let request = format!( r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#, id, method, params ); - self.requests.insert((id, chain_id), sender); + chain_data.requests.insert(id, sender); tracing::trace!(target: LOG_TARGET, "Tracking request id={id} chain={chain_id:?}"); - let result = self.client.json_rpc_request(request, chain_id); + let result = client.json_rpc_request(request, chain_id); if let Err(err) = result { tracing::warn!( target: LOG_TARGET, @@ -155,9 +173,9 @@ impl BackgroundTask { err.to_string() ); - let sender = self + let sender = chain_data .requests - .remove(&(id, chain_id)) + .remove(&id) .expect("Channel is inserted above; qed"); // Send the error back to frontend. @@ -182,9 +200,11 @@ impl BackgroundTask { sender, chain_id, } => { + let (chain_data, client) = self.for_chain_id(chain_id); + let id = chain_data.next_id(); + // For subscriptions we need to make a plain RPC request to the subscription method. // The server will return as a result the subscription ID. - let id = self.next_id(chain_id); let request = format!( r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#, id, method, params @@ -198,19 +218,20 @@ impl BackgroundTask { unsubscribe_method, }, }; - self.id_to_subscription - .insert((id, chain_id), subscription_id_state); + chain_data + .id_to_subscription + .insert(id, subscription_id_state); - let result = self.client.json_rpc_request(request, chain_id); + let result = client.json_rpc_request(request, chain_id); if let Err(err) = result { tracing::warn!( target: LOG_TARGET, "Cannot send RPC request to lightclient {:?}", err.to_string() ); - let subscription_id_state = self + let subscription_id_state = chain_data .id_to_subscription - .remove(&(id, chain_id)) + .remove(&id) .expect("Channels are inserted above; qed"); // Send the error back to frontend. @@ -234,6 +255,7 @@ impl BackgroundTask { /// Parse the response received from the light client and sent it to the appropriate user. fn handle_rpc_response(&mut self, chain_id: smoldot_light::ChainId, response: String) { tracing::trace!(target: LOG_TARGET, "Received from smoldot response={response} chain={chain_id:?}"); + let (chain_data, _client) = self.for_chain_id(chain_id); match RpcResponse::from_str(&response) { Ok(RpcResponse::Error { id, error }) => { @@ -242,7 +264,7 @@ impl BackgroundTask { return; }; - if let Some(sender) = self.requests.remove(&(id, chain_id)) { + if let Some(sender) = chain_data.requests.remove(&id) { if sender .send(Err(LightClientRpcError::Request(error.to_string()))) .is_err() @@ -253,7 +275,7 @@ impl BackgroundTask { ); } } else if let Some(subscription_id_state) = - self.id_to_subscription.remove(&(id, chain_id)) + chain_data.id_to_subscription.remove(&id) { if subscription_id_state .sub_id_sender @@ -274,15 +296,14 @@ impl BackgroundTask { }; // Send the response back. - if let Some(sender) = self.requests.remove(&(id, chain_id)) { + if let Some(sender) = chain_data.requests.remove(&id) { if sender.send(Ok(result)).is_err() { tracing::warn!( target: LOG_TARGET, "Cannot send method response to id={id} chain={chain_id:?}", ); } - } else if let Some(pending_subscription) = - self.id_to_subscription.remove(&(id, chain_id)) + } else if let Some(pending_subscription) = chain_data.id_to_subscription.remove(&id) { let Ok(sub_id) = result .get() @@ -310,8 +331,7 @@ impl BackgroundTask { } // Track this subscription ID if send is successful. - self.subscriptions - .insert((sub_id, chain_id), active_subscription); + chain_data.subscriptions.insert(sub_id, active_subscription); } else { tracing::warn!( target: LOG_TARGET, @@ -325,7 +345,7 @@ impl BackgroundTask { return; }; - let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else { + let Some(subscription_state) = chain_data.subscriptions.get_mut(&id) else { tracing::warn!( target: LOG_TARGET, "Subscription response id={id} chain={chain_id:?} method={method} is not tracked", @@ -338,12 +358,12 @@ impl BackgroundTask { } // User dropped the receiver, unsubscribe from the method and remove internal tracking. - let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else { + let Some(subscription_state) = chain_data.subscriptions.remove(&id) else { // State is checked to be some above, so this should never happen. return; }; // Make a call to unsubscribe from this method. - let unsub_id = self.next_id(chain_id); + let unsub_id = chain_data.next_id(); let request = format!( r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#, unsub_id, subscription_state.unsubscribe_method, id