mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-06-12 15:51:12 +00:00
[lightclient] lookup chain ID just once at beginning of each message (#1452)
* lookup chain ID just once at beginning of each message * data => chain_data for more clarity * cargo fmt
This commit is contained in:
@@ -61,22 +61,37 @@ pub enum FromSubxt {
|
||||
pub struct BackgroundTask<TPlatform: PlatformRef, TChain> {
|
||||
/// Smoldot light client implementation that leverages the exposed platform.
|
||||
client: smoldot_light::Client<TPlatform, TChain>,
|
||||
/// Per-chain data.
|
||||
chain_data: HashMap<smoldot_light::ChainId, ChainData>,
|
||||
}
|
||||
|
||||
/// The data that we store for each chain.
|
||||
#[derive(Default)]
|
||||
struct ChainData {
|
||||
/// Generates an unique monotonically increasing ID for each chain.
|
||||
request_id_per_chain: HashMap<smoldot_light::ChainId, usize>,
|
||||
last_request_id: usize,
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
requests: HashMap<(usize, smoldot_light::ChainId), oneshot::Sender<MethodResponse>>,
|
||||
requests: HashMap<usize, oneshot::Sender<MethodResponse>>,
|
||||
/// Subscription calls first need to make a plain RPC method
|
||||
/// request to obtain the subscription ID.
|
||||
///
|
||||
/// The RPC method request is made in the background and the response should
|
||||
/// not be sent back to the user.
|
||||
/// Map the request ID of a RPC method to the frontend `Sender`.
|
||||
id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>,
|
||||
id_to_subscription: HashMap<usize, PendingSubscription>,
|
||||
/// Map the subscription ID to the frontend `Sender`.
|
||||
///
|
||||
/// The subscription ID is entirely generated by the node (smoldot). Therefore, it is
|
||||
/// possible for two distinct subscriptions of different chains to have the same subscription ID.
|
||||
subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>,
|
||||
subscriptions: HashMap<usize, ActiveSubscription>,
|
||||
}
|
||||
|
||||
impl ChainData {
|
||||
/// Fetch and increment the request ID.
|
||||
fn next_id(&mut self) -> usize {
|
||||
self.last_request_id = self.last_request_id.wrapping_add(1);
|
||||
self.last_request_id
|
||||
}
|
||||
}
|
||||
|
||||
/// The state needed to resolve the subscription ID and send
|
||||
@@ -114,19 +129,20 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
) -> BackgroundTask<TPlatform, TChain> {
|
||||
BackgroundTask {
|
||||
client,
|
||||
request_id_per_chain: Default::default(),
|
||||
requests: Default::default(),
|
||||
id_to_subscription: Default::default(),
|
||||
subscriptions: Default::default(),
|
||||
chain_data: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch and increment the request ID.
|
||||
fn next_id(&mut self, chain_id: smoldot_light::ChainId) -> usize {
|
||||
let next = self.request_id_per_chain.entry(chain_id).or_insert(1);
|
||||
let id = *next;
|
||||
*next = next.wrapping_add(1);
|
||||
id
|
||||
fn for_chain_id(
|
||||
&mut self,
|
||||
chain_id: smoldot_light::ChainId,
|
||||
) -> (
|
||||
&mut ChainData,
|
||||
&mut smoldot_light::Client<TPlatform, TChain>,
|
||||
) {
|
||||
let chain_data = self.chain_data.entry(chain_id).or_default();
|
||||
let client = &mut self.client;
|
||||
(chain_data, client)
|
||||
}
|
||||
|
||||
/// Handle the registration messages received from the user.
|
||||
@@ -138,16 +154,18 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
sender,
|
||||
chain_id,
|
||||
} => {
|
||||
let id = self.next_id(chain_id);
|
||||
let (chain_data, client) = self.for_chain_id(chain_id);
|
||||
let id = chain_data.next_id();
|
||||
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
|
||||
id, method, params
|
||||
);
|
||||
|
||||
self.requests.insert((id, chain_id), sender);
|
||||
chain_data.requests.insert(id, sender);
|
||||
tracing::trace!(target: LOG_TARGET, "Tracking request id={id} chain={chain_id:?}");
|
||||
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
let result = client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -155,9 +173,9 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
err.to_string()
|
||||
);
|
||||
|
||||
let sender = self
|
||||
let sender = chain_data
|
||||
.requests
|
||||
.remove(&(id, chain_id))
|
||||
.remove(&id)
|
||||
.expect("Channel is inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
@@ -182,9 +200,11 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
sender,
|
||||
chain_id,
|
||||
} => {
|
||||
let (chain_data, client) = self.for_chain_id(chain_id);
|
||||
let id = chain_data.next_id();
|
||||
|
||||
// For subscriptions we need to make a plain RPC request to the subscription method.
|
||||
// The server will return as a result the subscription ID.
|
||||
let id = self.next_id(chain_id);
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":{}}}"#,
|
||||
id, method, params
|
||||
@@ -198,19 +218,20 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
unsubscribe_method,
|
||||
},
|
||||
};
|
||||
self.id_to_subscription
|
||||
.insert((id, chain_id), subscription_id_state);
|
||||
chain_data
|
||||
.id_to_subscription
|
||||
.insert(id, subscription_id_state);
|
||||
|
||||
let result = self.client.json_rpc_request(request, chain_id);
|
||||
let result = client.json_rpc_request(request, chain_id);
|
||||
if let Err(err) = result {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send RPC request to lightclient {:?}",
|
||||
err.to_string()
|
||||
);
|
||||
let subscription_id_state = self
|
||||
let subscription_id_state = chain_data
|
||||
.id_to_subscription
|
||||
.remove(&(id, chain_id))
|
||||
.remove(&id)
|
||||
.expect("Channels are inserted above; qed");
|
||||
|
||||
// Send the error back to frontend.
|
||||
@@ -234,6 +255,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
/// Parse the response received from the light client and sent it to the appropriate user.
|
||||
fn handle_rpc_response(&mut self, chain_id: smoldot_light::ChainId, response: String) {
|
||||
tracing::trace!(target: LOG_TARGET, "Received from smoldot response={response} chain={chain_id:?}");
|
||||
let (chain_data, _client) = self.for_chain_id(chain_id);
|
||||
|
||||
match RpcResponse::from_str(&response) {
|
||||
Ok(RpcResponse::Error { id, error }) => {
|
||||
@@ -242,7 +264,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
return;
|
||||
};
|
||||
|
||||
if let Some(sender) = self.requests.remove(&(id, chain_id)) {
|
||||
if let Some(sender) = chain_data.requests.remove(&id) {
|
||||
if sender
|
||||
.send(Err(LightClientRpcError::Request(error.to_string())))
|
||||
.is_err()
|
||||
@@ -253,7 +275,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
);
|
||||
}
|
||||
} else if let Some(subscription_id_state) =
|
||||
self.id_to_subscription.remove(&(id, chain_id))
|
||||
chain_data.id_to_subscription.remove(&id)
|
||||
{
|
||||
if subscription_id_state
|
||||
.sub_id_sender
|
||||
@@ -274,15 +296,14 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
};
|
||||
|
||||
// Send the response back.
|
||||
if let Some(sender) = self.requests.remove(&(id, chain_id)) {
|
||||
if let Some(sender) = chain_data.requests.remove(&id) {
|
||||
if sender.send(Ok(result)).is_err() {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Cannot send method response to id={id} chain={chain_id:?}",
|
||||
);
|
||||
}
|
||||
} else if let Some(pending_subscription) =
|
||||
self.id_to_subscription.remove(&(id, chain_id))
|
||||
} else if let Some(pending_subscription) = chain_data.id_to_subscription.remove(&id)
|
||||
{
|
||||
let Ok(sub_id) = result
|
||||
.get()
|
||||
@@ -310,8 +331,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
|
||||
// Track this subscription ID if send is successful.
|
||||
self.subscriptions
|
||||
.insert((sub_id, chain_id), active_subscription);
|
||||
chain_data.subscriptions.insert(sub_id, active_subscription);
|
||||
} else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
@@ -325,7 +345,7 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else {
|
||||
let Some(subscription_state) = chain_data.subscriptions.get_mut(&id) else {
|
||||
tracing::warn!(
|
||||
target: LOG_TARGET,
|
||||
"Subscription response id={id} chain={chain_id:?} method={method} is not tracked",
|
||||
@@ -338,12 +358,12 @@ impl<TPlatform: PlatformRef, TChain> BackgroundTask<TPlatform, TChain> {
|
||||
}
|
||||
|
||||
// User dropped the receiver, unsubscribe from the method and remove internal tracking.
|
||||
let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else {
|
||||
let Some(subscription_state) = chain_data.subscriptions.remove(&id) else {
|
||||
// State is checked to be some above, so this should never happen.
|
||||
return;
|
||||
};
|
||||
// Make a call to unsubscribe from this method.
|
||||
let unsub_id = self.next_id(chain_id);
|
||||
let unsub_id = chain_data.next_id();
|
||||
let request = format!(
|
||||
r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#,
|
||||
unsub_id, subscription_state.unsubscribe_method, id
|
||||
|
||||
Reference in New Issue
Block a user