From 08a3e6574d38ed7908e163ac554b81b59e0b8a67 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 8 Jun 2021 19:15:12 +0200 Subject: [PATCH] deps: update jsonrpsee 0.2.0 (#285) * deps: update jsonrpsee 0.2.0 The motivation is to avoid pinning certain alpha versions and to avoid breaking users builds without having to some `Cargo.lock` updating. * cargo fmt * fix tests * fix a few clippy lints * cargo fmt --- Cargo.toml | 15 ++++++------- client/Cargo.toml | 2 +- client/src/lib.rs | 51 +++++++++++++++++++++------------------------ src/lib.rs | 2 +- src/rpc.rs | 18 ++++++++-------- src/subscription.rs | 31 ++++++++++++++++++++++++--- src/tests/mod.rs | 4 ++-- 7 files changed, 73 insertions(+), 50 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1fa4890b6e..913d7d98fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,9 +19,9 @@ include = ["Cargo.toml", "src/**/*.rs", "README.md", "LICENSE"] [features] default = ["tokio1"] client = ["substrate-subxt-client"] -# jsonrpsee http client can be configured to use tokio02 or tokio1. -tokio02 = ["jsonrpsee-http-client/tokio02"] -tokio1 = ["jsonrpsee-http-client/tokio1"] +# jsonrpsee can be configured to use tokio02 or tokio1. +tokio02 = ["jsonrpsee-http-client/tokio02", "jsonrpsee-ws-client/tokio02"] +tokio1 = ["jsonrpsee-http-client/tokio1", "jsonrpsee-ws-client/tokio1"] [dependencies] async-trait = "0.1.49" @@ -29,9 +29,10 @@ codec = { package = "parity-scale-codec", version = "2.1", default-features = fa dyn-clone = "1.0.4" futures = "0.3.13" hex = "0.4.3" -jsonrpsee-proc-macros = "=0.2.0-alpha.6" -jsonrpsee-ws-client = "=0.2.0-alpha.6" -jsonrpsee-http-client = { version = "=0.2.0-alpha.6", default-features = false } +jsonrpsee-proc-macros = "0.2.0" +jsonrpsee-ws-client = { version = "0.2.0", default-features = false } +jsonrpsee-http-client = { version = "0.2.0", default-features = false } +jsonrpsee-types = "0.2.0" log = "0.4.14" num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.124", features = ["derive"] } @@ -56,7 +57,7 @@ pallet-staking = "3.0.0" [dev-dependencies] assert_matches = "1.5.0" -async-std = { version = "1.9.0", features = ["attributes"] } +async-std = { version = "1.9.0", features = ["attributes", "tokio1"] } env_logger = "0.8.3" tempdir = "0.3.7" wabt = "0.10.0" diff --git a/client/Cargo.toml b/client/Cargo.toml index f02963152c..b9e2f4bb41 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -15,7 +15,7 @@ keywords = ["parity", "substrate", "blockchain"] async-std = "1.8.0" futures = { version = "0.3.9", features = ["compat"], package = "futures" } futures01 = { package = "futures", version = "0.1.29" } -jsonrpsee-types = "=0.2.0-alpha.6" +jsonrpsee-types = "0.2.0" log = "0.4.13" serde_json = "1.0.61" thiserror = "1.0.23" diff --git a/client/src/lib.rs b/client/src/lib.rs index c6ad4fa32c..3c7461b3d2 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -42,7 +42,7 @@ use futures01::sync::mpsc as mpsc01; use jsonrpsee_types::{ v2::{ error::{ - JsonRpcErrorAlloc, + JsonRpcError, JsonRpcErrorCode, }, params::{ @@ -51,15 +51,14 @@ use jsonrpsee_types::{ SubscriptionId, TwoPointZero, }, - parse_request_id, request::{ JsonRpcCallSer, JsonRpcInvalidRequest, JsonRpcNotificationSer, }, response::{ - JsonRpcNotifResponse, JsonRpcResponse, + JsonRpcSubscriptionResponseAlloc, }, }, DeserializeOwned, @@ -68,6 +67,7 @@ use jsonrpsee_types::{ JsonValue, RequestMessage, Subscription, + SubscriptionKind, SubscriptionMessage, }; use sc_network::config::TransportConfig; @@ -94,7 +94,6 @@ use sc_service::{ }; use std::{ collections::HashMap, - marker::PhantomData, sync::atomic::{ AtomicU64, Ordering, @@ -211,7 +210,7 @@ impl SubxtClient { while let Some(Ok(response)) = from_back.next().await { let notif = serde_json::from_str::< - JsonRpcNotifResponse, + JsonRpcSubscriptionResponseAlloc, >( &response ) @@ -241,7 +240,7 @@ impl SubxtClient { let _ = rpc.rpc_query(&session, &message).await; } } - FrontToBack::Batch(_) => (), + _ => (), } } })), @@ -280,7 +279,7 @@ impl SubxtClient { .clone() .send(FrontToBack::Notification(msg)) .await - .map_err(|e| JsonRpseeError::TransportError(Box::new(e))) + .map_err(|e| JsonRpseeError::Transport(Box::new(e))) } /// Send a JSONRPC request. @@ -306,12 +305,12 @@ impl SubxtClient { send_back: Some(send_back_tx), })) .await - .map_err(|e| JsonRpseeError::TransportError(Box::new(e)))?; + .map_err(|e| JsonRpseeError::Transport(Box::new(e)))?; let json_value = match send_back_rx.await { Ok(Ok(v)) => v, Ok(Err(err)) => return Err(err), - Err(err) => return Err(JsonRpseeError::TransportError(Box::new(err))), + Err(err) => return Err(JsonRpseeError::Transport(Box::new(err))), }; serde_json::from_value(json_value).map_err(JsonRpseeError::ParseError) } @@ -351,14 +350,13 @@ impl SubxtClient { let (notifs_rx, id) = match send_back_rx.await { Ok(Ok(val)) => val, Ok(Err(err)) => return Err(err), - Err(err) => return Err(JsonRpseeError::TransportError(Box::new(err))), + Err(err) => return Err(JsonRpseeError::Transport(Box::new(err))), }; - Ok(Subscription { - to_back: self.to_back.clone(), + Ok(Subscription::new( + self.to_back.clone(), notifs_rx, - marker: PhantomData, - id, - }) + SubscriptionKind::Subscription(id), + )) } } @@ -512,26 +510,25 @@ fn read_jsonrpc_response( maybe_msg: Option, id: Id, ) -> Option> { - let msg = maybe_msg?; - match serde_json::from_str::>(&msg) { - Ok(rp) => { - match parse_request_id::(rp.id) { - Ok(rp_id) if rp_id == id => Some(Ok(rp.result)), - _ => Some(Err(JsonRpseeError::InvalidRequestId)), - } - } + let msg: String = maybe_msg?; + // NOTE: `let res` is a workaround because rustc otherwise doesn't compile + // `msg` doesn't live long enough. + let res = match serde_json::from_str::>(&msg) { + Ok(rp) if rp.id == id => Some(Ok(rp.result)), + Ok(_) => Some(Err(JsonRpseeError::InvalidRequestId)), Err(_) => { match serde_json::from_str::>(&msg) { Ok(err) => { - let err = JsonRpcErrorAlloc { + let err = JsonRpcError { jsonrpc: TwoPointZero, error: JsonRpcErrorCode::InvalidRequest.into(), - id: parse_request_id(err.id).ok()?, + id: err.id, }; - Some(Err(JsonRpseeError::Request(err))) + Some(Err(JsonRpseeError::Request(err.to_string()))) } Err(_) => None, } } - } + }; + res } diff --git a/src/lib.rs b/src/lib.rs index a8b8f6936c..286e298b1c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -210,7 +210,7 @@ impl ClientBuilder { if url.starts_with("ws://") || url.starts_with("wss://") { let client = WsClientBuilder::default() .max_notifs_per_subscription(4096) - .build(&url) + .build(url) .await?; RpcClient::WebSocket(Arc::new(client)) } else { diff --git a/src/rpc.rs b/src/rpc.rs index 22b8e7e310..2673021fc4 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -31,19 +31,19 @@ use core::{ marker::PhantomData, }; use frame_metadata::RuntimeMetadataPrefixed; -use jsonrpsee_http_client::{ +use jsonrpsee_http_client::HttpClient; +use jsonrpsee_types::{ to_json_value, - traits::Client, + traits::{ + Client, + SubscriptionClient, + }, DeserializeOwned, Error as RpcError, - HttpClient, JsonValue, -}; -use jsonrpsee_ws_client::{ - traits::SubscriptionClient, Subscription, - WsClient, }; +use jsonrpsee_ws_client::WsClient; use serde::{ Deserialize, Serialize, @@ -541,7 +541,7 @@ impl Rpc { }?; let mut xt_sub = self.watch_extrinsic(extrinsic).await?; - while let Some(status) = xt_sub.next().await { + while let Ok(Some(status)) = xt_sub.next().await { log::info!("received status {:?}", status); match status { // ignore in progress extrinsic for now @@ -604,7 +604,7 @@ impl Rpc { ext_hash, )) })?; - let mut sub = EventSubscription::new(events_sub, &decoder); + let mut sub = EventSubscription::new(events_sub, decoder); sub.filter_extrinsic(block_hash, ext_index); let mut events = vec![]; while let Some(event) = sub.next().await { diff --git a/src/subscription.rs b/src/subscription.rs index a6d870739a..2fe7192e56 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -14,7 +14,10 @@ // You should have received a copy of the GNU General Public License // along with substrate-subxt. If not, see . -use jsonrpsee_ws_client::Subscription; +use jsonrpsee_types::{ + DeserializeOwned, + Subscription, +}; use sp_core::{ storage::{ StorageChangeSet, @@ -176,7 +179,9 @@ impl FinalizedEventStorageSubscription { if let Some(storage_change) = self.storage_changes.pop_front() { return Some(storage_change) } - let header: T::Header = self.subscription.next().await?; + let header: T::Header = + read_subscription_response("HeaderSubscription", &mut self.subscription) + .await?; self.storage_changes.extend( self.rpc .query_storage_at(&[self.storage_key.clone()], Some(header.hash())) @@ -199,8 +204,28 @@ impl EventStorageSubscription { /// Gets the next change_set from the subscription. pub async fn next(&mut self) -> Option> { match self { - Self::Imported(event_sub) => event_sub.next().await, + Self::Imported(event_sub) => { + read_subscription_response("StorageChangeSetSubscription", event_sub) + .await + } Self::Finalized(event_sub) => event_sub.next().await, } } } + +async fn read_subscription_response( + sub_name: &str, + sub: &mut Subscription, +) -> Option +where + T: DeserializeOwned, +{ + match sub.next().await { + Ok(Some(next)) => Some(next), + Ok(None) => None, + Err(e) => { + log::error!("Subscription {} failed: {:?} dropping", sub_name, e); + None + } + } +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs index b63d9a58bb..43f59ca7b6 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -141,7 +141,7 @@ async fn test_chain_subscribe_blocks() { let node_process = test_node_process().await; let client = node_process.client(); let mut blocks = client.subscribe_blocks().await.unwrap(); - blocks.next().await; + blocks.next().await.unwrap(); } #[async_std::test] @@ -149,7 +149,7 @@ async fn test_chain_subscribe_finalized_blocks() { let node_process = test_node_process().await; let client = node_process.client(); let mut blocks = client.subscribe_finalized_blocks().await.unwrap(); - blocks.next().await; + blocks.next().await.unwrap(); } #[async_std::test]