diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b8e54652a6..635c6ffafe 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -46,10 +46,44 @@ jobs: command: fmt args: --all -- --check + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + + machete: + name: "Check unused dependencies" + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Use substrate-node binary + uses: ./.github/workflows/actions/use-substrate + + - name: Install Rust stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Rust Cache + uses: Swatinem/rust-cache@23bce251a8cd2ffc3c1075eaa2367cf899916d84 # v2.7.3 + + - name: Install cargo-machete + run: cargo install cargo-machete + + - name: Check unused dependencies + uses: actions-rs/cargo@v1.0.3 + with: + command: machete + + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + clippy: name: Cargo clippy runs-on: ubuntu-latest - needs: fmt + needs: [fmt, machete] steps: - name: Checkout sources uses: actions/checkout@v4 @@ -68,16 +102,51 @@ jobs: - name: Rust Cache uses: Swatinem/rust-cache@23bce251a8cd2ffc3c1075eaa2367cf899916d84 # v2.7.3 + - name: Run clippy + run: | + cargo clippy --all-targets --features unstable-light-client -- -D warnings + cargo clippy -p subxt-lightclient --no-default-features --features web -- -D warnings + cargo clippy -p subxt --no-default-features --features web -- -D warnings + cargo clippy -p subxt --no-default-features --features web,unstable-light-client -- -D warnings + + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + + wasm_clippy: + name: Cargo clippy (WASM) + runs-on: ubuntu-latest + needs: [fmt, machete] + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Use substrate-node binary + uses: ./.github/workflows/actions/use-substrate + + - name: Install Rust stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + target: wasm32-unknown-unknown + override: true + + - name: Rust Cache + uses: Swatinem/rust-cache@23bce251a8cd2ffc3c1075eaa2367cf899916d84 # v2.7.3 + - name: Run clippy uses: actions-rs/cargo@v1 with: command: clippy - args: --all-targets -- -D warnings + args: -p subxt --no-default-features --features web,unstable-light-client,jsonrpsee --target wasm32-unknown-unknown -- -D warnings + + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 check: name: Cargo check runs-on: ubuntu-latest - needs: [fmt, clippy] + needs: [fmt, machete] steps: - name: Checkout sources uses: actions/checkout@v4 @@ -133,10 +202,13 @@ jobs: - name: Cargo check parachain-example run: cargo check --manifest-path examples/parachain-example/Cargo.toml + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + wasm_check: name: Cargo check (WASM) runs-on: ubuntu-latest - needs: [fmt, clippy] + needs: [fmt, machete] steps: - name: Checkout sources uses: actions/checkout@v4 @@ -157,39 +229,13 @@ jobs: run: | cargo check --manifest-path examples/wasm-example/Cargo.toml --target wasm32-unknown-unknown - machete: - name: "Check unused dependencies" - runs-on: ubuntu-latest - needs: [check, wasm_check] - steps: - - name: Checkout sources - uses: actions/checkout@v4 - - - name: Use substrate-node binary - uses: ./.github/workflows/actions/use-substrate - - - name: Install Rust stable toolchain - uses: actions-rs/toolchain@v1 - with: - profile: minimal - toolchain: stable - override: true - - - name: Rust Cache - uses: Swatinem/rust-cache@23bce251a8cd2ffc3c1075eaa2367cf899916d84 # v2.7.3 - - - name: Install cargo-machete - run: cargo install cargo-machete - - - name: Check unused dependencies - uses: actions-rs/cargo@v1.0.3 - with: - command: machete + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 docs: name: Check documentation and run doc tests runs-on: ubuntu-latest - needs: [check, wasm_check] + needs: [fmt, machete] steps: - name: Checkout sources uses: actions/checkout@v4 @@ -216,10 +262,13 @@ jobs: command: test args: --doc + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + tests: name: "Test (Native)" runs-on: ubuntu-latest-16-cores - needs: [machete, docs] + needs: [clippy, wasm_clippy, check, wasm_check, docs] timeout-minutes: 30 steps: - name: Checkout sources @@ -247,10 +296,13 @@ jobs: command: nextest args: run --workspace + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + unstable_backend_tests: name: "Test (Unstable Backend)" runs-on: ubuntu-latest-16-cores - needs: [machete, docs] + needs: [clippy, wasm_clippy, check, wasm_check, docs] timeout-minutes: 30 steps: - name: Checkout sources @@ -278,10 +330,13 @@ jobs: command: nextest args: run --workspace --features unstable-backend-client + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + light_client_tests: name: "Test (Light Client)" runs-on: ubuntu-latest - needs: [machete, docs] + needs: [clippy, wasm_clippy, check, wasm_check, docs] timeout-minutes: 15 steps: - name: Checkout sources @@ -306,10 +361,13 @@ jobs: command: test args: --release --package integration-tests --features unstable-light-client + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + wasm_tests: name: Test (WASM) runs-on: ubuntu-latest - needs: [machete, docs] + needs: [clippy, wasm_clippy, check, wasm_check, docs] timeout-minutes: 30 env: # Set timeout for wasm tests to be much bigger than the default 20 secs. @@ -359,6 +417,9 @@ jobs: wasm-pack test --headless --chrome working-directory: signer/wasm-tests + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 + no-std-tests: name: "Test (no_std)" runs-on: ubuntu-latest @@ -384,3 +445,6 @@ jobs: run: | cargo run working-directory: testing/no-std-tests + + - if: "failure()" + uses: "andymckay/cancel-action@b9280e3f8986d7a8e91c7462efc0fa318010c8b1" # v0.3 diff --git a/lightclient/src/background.rs b/lightclient/src/background.rs index 1e34596d9e..01614d202e 100644 --- a/lightclient/src/background.rs +++ b/lightclient/src/background.rs @@ -43,6 +43,8 @@ pub enum FromSubxt { Subscription { /// The method of the request. method: String, + /// The method to unsubscribe. + unsubscribe_method: String, /// The parameters of the request. params: String, /// Channel used to send back the subscription ID if successful. @@ -69,18 +71,40 @@ pub struct BackgroundTask { /// The RPC method request is made in the background and the response should /// not be sent back to the user. /// Map the request ID of a RPC method to the frontend `Sender`. - id_to_subscription: HashMap< - (usize, smoldot_light::ChainId), - ( - oneshot::Sender, - mpsc::UnboundedSender>, - ), - >, + id_to_subscription: HashMap<(usize, smoldot_light::ChainId), PendingSubscription>, /// Map the subscription ID to the frontend `Sender`. /// /// The subscription ID is entirely generated by the node (smoldot). Therefore, it is /// possible for two distinct subscriptions of different chains to have the same subscription ID. - subscriptions: HashMap<(usize, smoldot_light::ChainId), mpsc::UnboundedSender>>, + subscriptions: HashMap<(usize, smoldot_light::ChainId), ActiveSubscription>, +} + +/// The state needed to resolve the subscription ID and send +/// back the response to frontend. +struct PendingSubscription { + /// Send the method response ID back to the user. + /// + /// It contains the subscription ID if successful, or an JSON RPC error object. + sub_id_sender: oneshot::Sender, + /// The subscription state that is added to the `subscriptions` map only + /// if the subscription ID is successfully sent back to the user. + subscription_state: ActiveSubscription, +} + +impl PendingSubscription { + /// Transforms the pending subscription into an active subscription. + fn into_parts(self) -> (oneshot::Sender, ActiveSubscription) { + (self.sub_id_sender, self.subscription_state) + } +} + +/// The state of the subscription. +struct ActiveSubscription { + /// Channel to send the subscription notifications back to frontend. + sender: mpsc::UnboundedSender>, + /// The unsubscribe method to call when the user drops the receiver + /// part of the channel. + unsubscribe_method: String, } impl BackgroundTask { @@ -152,6 +176,7 @@ impl BackgroundTask { } FromSubxt::Subscription { method, + unsubscribe_method, params, sub_id, sender, @@ -166,8 +191,15 @@ impl BackgroundTask { ); tracing::trace!(target: LOG_TARGET, "Tracking subscription request id={id} chain={chain_id:?}"); + let subscription_id_state = PendingSubscription { + sub_id_sender: sub_id, + subscription_state: ActiveSubscription { + sender, + unsubscribe_method, + }, + }; self.id_to_subscription - .insert((id, chain_id), (sub_id, sender)); + .insert((id, chain_id), subscription_id_state); let result = self.client.json_rpc_request(request, chain_id); if let Err(err) = result { @@ -176,13 +208,14 @@ impl BackgroundTask { "Cannot send RPC request to lightclient {:?}", err.to_string() ); - let (sub_id, _) = self + let subscription_id_state = self .id_to_subscription .remove(&(id, chain_id)) .expect("Channels are inserted above; qed"); // Send the error back to frontend. - if sub_id + if subscription_id_state + .sub_id_sender .send(Err(LightClientRpcError::Request(err.to_string()))) .is_err() { @@ -219,10 +252,11 @@ impl BackgroundTask { "Cannot send method response to id={id} chain={chain_id:?}", ); } - } else if let Some((sub_id_sender, _)) = + } else if let Some(subscription_id_state) = self.id_to_subscription.remove(&(id, chain_id)) { - if sub_id_sender + if subscription_id_state + .sub_id_sender .send(Err(LightClientRpcError::Request(error.to_string()))) .is_err() { @@ -247,7 +281,7 @@ impl BackgroundTask { "Cannot send method response to id={id} chain={chain_id:?}", ); } - } else if let Some((sub_id_sender, sender)) = + } else if let Some(pending_subscription) = self.id_to_subscription.remove(&(id, chain_id)) { let Ok(sub_id) = result @@ -265,15 +299,19 @@ impl BackgroundTask { tracing::trace!(target: LOG_TARGET, "Received subscription id={sub_id} chain={chain_id:?}"); + let (sub_id_sender, active_subscription) = pending_subscription.into_parts(); if sub_id_sender.send(Ok(result)).is_err() { tracing::warn!( target: LOG_TARGET, "Cannot send method response to id={id} chain={chain_id:?}", ); - } else { - // Track this subscription ID if send is successful. - self.subscriptions.insert((sub_id, chain_id), sender); + + return; } + + // Track this subscription ID if send is successful. + self.subscriptions + .insert((sub_id, chain_id), active_subscription); } else { tracing::warn!( target: LOG_TARGET, @@ -287,22 +325,37 @@ impl BackgroundTask { return; }; - if let Some(sender) = self.subscriptions.get_mut(&(id, chain_id)) { - // Send the current notification response. - if sender.send(result).is_err() { - tracing::warn!( - target: LOG_TARGET, - "Cannot send notification to subscription id={id} chain={chain_id:?} method={method}", - ); - - // Remove the sender if the subscription dropped the receiver. - self.subscriptions.remove(&(id, chain_id)); - } - } else { + let Some(subscription_state) = self.subscriptions.get_mut(&(id, chain_id)) else { tracing::warn!( target: LOG_TARGET, - "Subscription response id={id} chain={chain_id:?} is not tracked", + "Subscription response id={id} chain={chain_id:?} method={method} is not tracked", ); + return; + }; + if subscription_state.sender.send(result).is_ok() { + // Nothing else to do, user is informed about the notification. + return; + } + + // User dropped the receiver, unsubscribe from the method and remove internal tracking. + let Some(subscription_state) = self.subscriptions.remove(&(id, chain_id)) else { + // State is checked to be some above, so this should never happen. + return; + }; + // Make a call to unsubscribe from this method. + let unsub_id = self.next_id(chain_id); + let request = format!( + r#"{{"jsonrpc":"2.0","id":"{}", "method":"{}","params":["{}"]}}"#, + unsub_id, subscription_state.unsubscribe_method, id + ); + + if let Err(err) = self.client.json_rpc_request(request, chain_id) { + tracing::warn!( + target: LOG_TARGET, + "Failed to unsubscribe id={id:?} chain={chain_id:?} method={:?} err={err:?}", subscription_state.unsubscribe_method + ); + } else { + tracing::debug!(target: LOG_TARGET,"Unsubscribe id={id:?} chain={chain_id:?} method={:?}", subscription_state.unsubscribe_method); } } Err(err) => { diff --git a/lightclient/src/client.rs b/lightclient/src/client.rs index 0269986384..f06c968b66 100644 --- a/lightclient/src/client.rs +++ b/lightclient/src/client.rs @@ -179,6 +179,7 @@ impl LightClientRpc { &self, method: String, params: String, + unsubscribe_method: String, ) -> Result< ( oneshot::Receiver, @@ -191,6 +192,7 @@ impl LightClientRpc { self.to_backend.send(FromSubxt::Subscription { method, + unsubscribe_method, params, sub_id, sender, diff --git a/lightclient/src/platform/wasm_platform.rs b/lightclient/src/platform/wasm_platform.rs index edc5fe2558..ebebff5532 100644 --- a/lightclient/src/platform/wasm_platform.rs +++ b/lightclient/src/platform/wasm_platform.rs @@ -124,14 +124,14 @@ impl PlatformRef for SubxtPlatform { port, } => { let addr = SocketAddr::from((ip, port)); - format!("ws://{}", addr.to_string()) + format!("ws://{}", addr) } Address::WebSocketIp { ip: IpAddr::V6(ip), port, } => { let addr = SocketAddr::from((ip, port)); - format!("ws://{}", addr.to_string()) + format!("ws://{}", addr) } // The API user of the `PlatformRef` trait is never supposed to open connections of diff --git a/lightclient/src/platform/wasm_socket.rs b/lightclient/src/platform/wasm_socket.rs index abe67e7c2d..74bb6b8c08 100644 --- a/lightclient/src/platform/wasm_socket.rs +++ b/lightclient/src/platform/wasm_socket.rs @@ -111,7 +111,7 @@ impl WasmSocket { let mut inner = inner.lock().expect("Mutex is poised; qed"); let bytes = js_sys::Uint8Array::new(&buffer).to_vec(); - inner.data.extend(bytes.into_iter()); + inner.data.extend(bytes); if let Some(waker) = inner.waker.take() { waker.wake(); diff --git a/subxt/src/client/light_client/builder.rs b/subxt/src/client/light_client/builder.rs index 2733fd4bb4..68db1631e1 100644 --- a/subxt/src/client/light_client/builder.rs +++ b/subxt/src/client/light_client/builder.rs @@ -6,11 +6,13 @@ use super::{rpc::LightClientRpc, LightClient, LightClientError}; use crate::backend::rpc::RpcClient; use crate::client::RawLightClient; use crate::macros::{cfg_jsonrpsee_native, cfg_jsonrpsee_web}; -use crate::utils::validate_url_is_secure; use crate::{config::Config, error::Error, OnlineClient}; use std::num::NonZeroU32; use subxt_lightclient::{smoldot, AddedChain}; +#[cfg(feature = "jsonrpsee")] +use crate::utils::validate_url_is_secure; + /// Builder for [`LightClient`]. #[derive(Clone, Debug)] pub struct LightClientBuilder { @@ -186,16 +188,11 @@ impl LightClientBuilder { } /// Raw builder for [`RawLightClient`]. +#[derive(Default)] pub struct RawLightClientBuilder { chains: Vec, } -impl Default for RawLightClientBuilder { - fn default() -> Self { - Self { chains: Vec::new() } - } -} - impl RawLightClientBuilder { /// Create a new [`RawLightClientBuilder`]. pub fn new() -> RawLightClientBuilder { diff --git a/subxt/src/client/light_client/rpc.rs b/subxt/src/client/light_client/rpc.rs index 1186528ec6..ea9fe996ba 100644 --- a/subxt/src/client/light_client/rpc.rs +++ b/subxt/src/client/light_client/rpc.rs @@ -56,8 +56,7 @@ impl LightClientRpc { pub fn new( config: smoldot::AddChainConfig<'_, (), impl Iterator>, ) -> Result { - let rpc = subxt_lightclient::LightClientRpc::new(config) - .map_err(|err| LightClientError::Rpc(err))?; + let rpc = subxt_lightclient::LightClientRpc::new(config).map_err(LightClientError::Rpc)?; Ok(LightClientRpc(rpc)) } @@ -106,7 +105,7 @@ impl RpcClientT for LightClientRpc { &'a self, sub: &'a str, params: Option>, - _unsub: &'a str, + unsub: &'a str, ) -> RawRpcFuture<'a, RawRpcSubscription> { let client = self.clone(); let chain_id = self.chain_id(); @@ -130,7 +129,7 @@ impl RpcClientT for LightClientRpc { // Fails if the background is closed. let (sub_id, notif) = client .0 - .subscription_request(sub.to_string(), params) + .subscription_request(sub.to_string(), params, unsub.to_string()) .map_err(|_| RpcError::ClientError(Box::new(LightClientError::BackgroundClosed)))?; // Fails if the background is closed. diff --git a/testing/integration-tests/src/utils/node_proc.rs b/testing/integration-tests/src/utils/node_proc.rs index 1b812fff46..d3ac752fec 100644 --- a/testing/integration-tests/src/utils/node_proc.rs +++ b/testing/integration-tests/src/utils/node_proc.rs @@ -258,5 +258,5 @@ async fn build_light_client(proc: &SubstrateNode) -> Result