From aaace2c41d23811a1524c463a502b66da50ae192 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 18 May 2023 20:03:51 +0300 Subject: [PATCH] lightclient: Add background task to manage RPC responses Signed-off-by: Alexandru Vasile --- subxt/src/rpc/lightclient/background.rs | 275 ++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 subxt/src/rpc/lightclient/background.rs diff --git a/subxt/src/rpc/lightclient/background.rs b/subxt/src/rpc/lightclient/background.rs new file mode 100644 index 0000000000..1bd0a23aa3 --- /dev/null +++ b/subxt/src/rpc/lightclient/background.rs @@ -0,0 +1,275 @@ +use futures::stream::StreamExt; +use futures_util::future::{self, Either}; +use serde::Deserialize; +use serde_json::value::RawValue; +use std::{collections::HashMap, str::FromStr}; +use tokio::sync::{mpsc, oneshot}; + +///! The background task of the light client. + +/// The number of notifications that are buffered, before the user +/// registers to [`LightClientInner::register_subscription`]. +/// Only the first `BUFFER_NUM_NOTIFICATIONS` are buffered, while the +/// others are ignored. +/// +/// In practice, the Light Client produces notifications at a lower rate +/// than the substrate full node. +const BUFFER_NUM_NOTIFICATIONS: usize = 16; + +const LOG_TARGET: &str = "light-client-background"; + +/// Message protocol between the front-end client that submits the RPC requests +/// and the backend handler that produces responses from the chain. +/// +/// The light client uses a single object [`smoldot_light::JsonRpcResponses`] to +/// handle all requests and subscriptions from a chain. A background task is spawned +/// to multiplex the rpc responses and to provide them back to their rightful submitters. +/// +/// This presumes that the request ID for both method calls and subscriptions is unique. +#[derive(Debug)] +pub enum BackendMessage { + /// The RPC method request. + Request { + /// ID of the request, needed to match the result. + id: String, + /// Channel used to send back the result. + sender: oneshot::Sender>, + }, + /// The RPC subscription (pub/sub) request. + Subscription { + /// ID of the subscription, needed to match the result. + id: String, + /// Channel used to send back the notifcations. + sender: mpsc::Sender>, + }, +} + +/// Background task data. +#[derive(Default)] +pub struct BackgroundTask { + /// Map the request ID of a RPC method to the frontend `Sender`. + requests: HashMap>>, + /// Map the subscription ID to the frontend `Sender`. + subscriptions: HashMap>>, + /// Notifications are cached for users that did not subscribe yet. + subscriptions_cache: HashMap>>, +} + +impl BackgroundTask { + /// Constructs a new [`BackgroundTask`]. + pub fn new() -> BackgroundTask { + BackgroundTask::default() + } + + /// Handle the registration messages received from the user. + async fn handle_register(&mut self, message: BackendMessage) { + match message { + BackendMessage::Request { id, sender } => { + self.requests.insert(id, sender); + } + BackendMessage::Subscription { id, sender } => { + // Drain the subscription cache, that holds messages that + // were not propagated to this subscription yet (because the + // RPC server produced a notification before the user registered + // to receive notifications). + if let Some(cached_responses) = self.subscriptions_cache.remove(&id) { + tracing::debug!(target: LOG_TARGET, "Some messages were cached before susbcribing"); + + for response in cached_responses { + if sender.send(response).await.is_err() { + tracing::warn!(target: LOG_TARGET, "Cannot send notification to susbcription {:?}", id); + } + } + } + + self.subscriptions.insert(id, sender); + } + }; + } + + /// Parse the response received from the light client and sent it to the appropriate user. + async fn handle_rpc_response(&mut self, response: String) { + match RpcResponse::from_str(&response) { + Ok(RpcResponse::Method { id, result }) => { + // Send the response back. + if let Some(sender) = self.requests.remove(&id) { + if sender.send(result).is_err() { + tracing::warn!(target: LOG_TARGET, " Cannot send method response to id {:?}", id); + } + } + } + Ok(RpcResponse::Subscription { method, id, result }) => { + // Subxt calls into `author_submitAndWatchExtrinsic`, however the smoldot produces + // `{"event":"broadcasted","numPeers":1}` which is part of the RPC V2 API. Ignore + // this spurious event. + if method == "transaction_unstable_watchEvent" + && result.to_string().contains("broadcasted") + { + tracing::debug!(target: LOG_TARGET, "Ignoring notification {:?}", result); + return; + } + + if let Some(sender) = self.subscriptions.get_mut(&id) { + // Send the current notification response. + if sender.send(result).await.is_err() { + tracing::warn!(target: LOG_TARGET, "Cannot send notification to susbcription {:?}", id); + } + return; + } + + // Subscription ID not registered yet, cache the response. + // Note: Compiler complains about moving the `result` for `.entry.and_modify(..).or_insert(..)`, + // because it sees it as used on both closures and apparently cannot determine that only one + // closure can be executed. + match self.subscriptions_cache.entry(id) { + std::collections::hash_map::Entry::Occupied(mut entry) => { + let cached_responses: &mut Vec<_> = entry.get_mut(); + // Do not cache notification if exceeded capacity. + if cached_responses.len() > BUFFER_NUM_NOTIFICATIONS { + return; + } + + cached_responses.push(result); + } + std::collections::hash_map::Entry::Vacant(entry) => { + let mut vec = Vec::with_capacity(BUFFER_NUM_NOTIFICATIONS); + vec.push(result); + entry.insert(vec); + } + }; + } + Err(err) => { + tracing::warn!(target: LOG_TARGET, "annot decode RPC response {:?}", err); + } + } + } + + /// Perform the main background task: + /// - receiving user registration for RPC method / subscriptions + /// - providing the results from the light client back to users. + pub async fn start_task( + &mut self, + backend: mpsc::Receiver, + rpc_responses: smoldot_light::JsonRpcResponses, + ) { + let backend_event = tokio_stream::wrappers::ReceiverStream::new(backend); + let rpc_responses_event = + futures_util::stream::unfold(rpc_responses, |mut rpc_responses| async { + rpc_responses + .next() + .await + .map(|result| (result, rpc_responses)) + }); + + tokio::pin!(backend_event, rpc_responses_event); + + let mut backend_event_fut = backend_event.next(); + let mut rpc_responses_fut = rpc_responses_event.next(); + + loop { + match future::select(backend_event_fut, rpc_responses_fut).await { + // Message received from the backend: user registered. + Either::Left((backend_value, previous_fut)) => { + let Some(message) = backend_value else { + tracing::trace!(target: LOG_TARGET, "Frontend channel closed"); + break; + }; + tracing::trace!(target: LOG_TARGET, "Received register message {:?}", message); + + self.handle_register(message).await; + + backend_event_fut = backend_event.next(); + rpc_responses_fut = previous_fut; + } + // Message received from rpc handler: lightclient response. + Either::Right((response, previous_fut)) => { + // Smoldot returns `None` if the chain has been removed (which subxt does not remove). + let Some(response) = response else { + tracing::trace!(target: LOG_TARGET, "Smoldot RPC responses channel closed"); + break; + }; + tracing::trace!(target: LOG_TARGET, "Received smoldot RPC result {:?}", response); + + self.handle_rpc_response(response).await; + + // Advance backend, save frontend. + backend_event_fut = previous_fut; + rpc_responses_fut = rpc_responses_event.next(); + } + } + } + + tracing::trace!(target: LOG_TARGET, "Task closed"); + } +} + +/// The RPC response from the light-client. +/// This can either be a response of a method, or a notification from a subscription. +#[derive(Debug, Clone)] +enum RpcResponse { + Method { + /// Response ID. + id: String, + /// The result of the method call. + result: Box, + }, + Subscription { + /// RPC method that generated the notification. + method: String, + /// Subscription ID. + id: String, + /// Result. + result: Box, + }, +} + +impl std::str::FromStr for RpcResponse { + type Err = serde_json::Error; + + fn from_str(response: &str) -> Result { + // Helper structures to deserialize from raw RPC strings. + #[derive(Deserialize, Debug)] + struct Response { + /// JSON-RPC version. + #[allow(unused)] + jsonrpc: String, + /// Result. + result: Box, + /// Request ID + id: String, + } + #[derive(Deserialize)] + struct NotificationParams { + /// The ID of the subscription. + subscription: String, + /// Result. + result: Box, + } + #[derive(Deserialize)] + struct ResponseNotification { + /// JSON-RPC version. + #[allow(unused)] + jsonrpc: String, + /// RPC method that generated the notification. + method: String, + /// Result. + params: NotificationParams, + } + + // Check if the response can be mapped as an RPC method response. + let result: Result = serde_json::from_str(&response); + if let Ok(response) = result { + return Ok(RpcResponse::Method { + id: response.id, + result: response.result, + }); + } + + let notification: ResponseNotification = serde_json::from_str(&response)?; + Ok(RpcResponse::Subscription { + id: notification.params.subscription, + method: notification.method, + result: notification.params.result, + }) + } +}