diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 4a52fa81a2..8062a677dc 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -51,8 +51,9 @@ jobs: with: # Use this issue template: filename: .github/issue_templates/nightly_run_failed.md - # Don't create a new issue; skip updating existing: - update_existing: false + # Update existing issue if found; hopefully will make it clearer + # that it is still an issue: + update_existing: true # Look for new *open* issues in this search (we want to # create a new one if we only find closed versions): search_existing: open diff --git a/Cargo.toml b/Cargo.toml index 604562f4c7..776121b2b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ chameleon = "0.1.0" scale-info = { version = "1.0.0", features = ["bit-vec"] } futures = "0.3.13" hex = "0.4.3" -jsonrpsee = { version = "0.5.1", features = ["macros", "ws-client", "http-client"] } +jsonrpsee = { version = "0.7.0", features = ["macros", "ws-client", "http-client"] } log = "0.4.14" num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.124", features = ["derive"] } diff --git a/examples/submit_and_watch.rs b/examples/submit_and_watch.rs index cda3214820..ddb9c97c6c 100644 --- a/examples/submit_and_watch.rs +++ b/examples/submit_and_watch.rs @@ -22,6 +22,7 @@ //! polkadot --dev --tmp //! ``` +use futures::StreamExt; use sp_keyring::AccountKeyring; use subxt::{ ClientBuilder, @@ -144,7 +145,8 @@ async fn handle_transfer_events() -> Result<(), Box> { .sign_and_submit_then_watch(&signer) .await?; - while let Some(ev) = balance_transfer_progress.next().await? { + while let Some(ev) = balance_transfer_progress.next().await { + let ev = ev?; use subxt::TransactionStatus::*; // Made it into a block, but not finalized. diff --git a/src/error.rs b/src/error.rs index bef8b18aff..2d16b7f29a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -22,7 +22,7 @@ use crate::{ }, Metadata, }; -use jsonrpsee::types::Error as RequestError; +use jsonrpsee::core::error::Error as RequestError; use sp_core::crypto::SecretStringError; use sp_runtime::{ transaction_validity::TransactionValidityError, diff --git a/src/rpc.rs b/src/rpc.rs index 39e7d8bbc5..310adcad09 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -33,25 +33,23 @@ use core::{ }; use frame_metadata::RuntimeMetadataPrefixed; use jsonrpsee::{ + core::{ + client::{ + Client, + ClientT, + Subscription, + SubscriptionClientT, + }, + to_json_value, + DeserializeOwned, + Error as RpcError, + JsonValue, + }, http_client::{ HttpClient, HttpClientBuilder, }, - types::{ - to_json_value, - traits::{ - Client, - SubscriptionClient, - }, - DeserializeOwned, - Error as RpcError, - JsonValue, - Subscription, - }, - ws_client::{ - WsClient, - WsClientBuilder, - }, + ws_client::WsClientBuilder, }; use serde::{ Deserialize, @@ -172,7 +170,7 @@ pub enum SubstrateTransactionStatus { #[derive(Clone)] pub enum RpcClient { /// JSONRPC client WebSocket transport. - WebSocket(Arc), + WebSocket(Arc), /// JSONRPC client HTTP transport. // NOTE: Arc because `HttpClient` is not clone. Http(Arc), @@ -239,14 +237,14 @@ impl RpcClient { } } -impl From for RpcClient { - fn from(client: WsClient) -> Self { +impl From for RpcClient { + fn from(client: Client) -> Self { RpcClient::WebSocket(Arc::new(client)) } } -impl From> for RpcClient { - fn from(client: Arc) -> Self { +impl From> for RpcClient { + fn from(client: Arc) -> Self { RpcClient::WebSocket(client) } } diff --git a/src/subscription.rs b/src/subscription.rs index f561e97093..693c160a6a 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with subxt. If not, see . -use jsonrpsee::types::{ +use jsonrpsee::core::{ + client::Subscription, DeserializeOwned, - Subscription, }; use sp_core::{ storage::{ @@ -247,12 +247,12 @@ where T: DeserializeOwned, { match sub.next().await { - Ok(Some(next)) => Some(next), - Ok(None) => None, - Err(e) => { + Some(Ok(next)) => Some(next), + Some(Err(e)) => { log::error!("Subscription {} failed: {:?} dropping", sub_name, e); None } + None => None, } } diff --git a/src/transaction.rs b/src/transaction.rs index 8ca385288c..625a0e4fd8 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with subxt. If not, see . +use std::task::Poll; + use sp_core::storage::StorageKey; use sp_runtime::traits::Hash; pub use sp_runtime::traits::SignedExtension; @@ -30,9 +32,13 @@ use crate::{ Config, Phase, }; -use jsonrpsee::types::{ +use futures::{ + Stream, + StreamExt, +}; +use jsonrpsee::core::{ + client::Subscription as RpcSubscription, Error as RpcError, - Subscription as RpcSubscription, }; /// This struct represents a subscription to the progress of some transaction, and is @@ -44,6 +50,11 @@ pub struct TransactionProgress<'client, T: Config> { client: &'client Client, } +// The above type is not `Unpin` by default unless the generic param `T` is, +// so we manually make it clear that Unpin is actually fine regardless of `T` +// (we don't care if this moves around in memory while it's "pinned"). +impl<'client, T: Config> Unpin for TransactionProgress<'client, T> {} + impl<'client, T: Config> TransactionProgress<'client, T> { pub(crate) fn new( sub: RpcSubscription>, @@ -57,61 +68,13 @@ impl<'client, T: Config> TransactionProgress<'client, T> { } } - /// Return the next transaction status when it's emitted. - pub async fn next(&mut self) -> Result>, Error> { - // Return `None` if the subscription has been dropped: - let sub = match &mut self.sub { - Some(sub) => sub, - None => return Ok(None), - }; - - // Return the next item otherwise: - let res = sub.next().await?; - Ok(res.map(|status| { - match status { - SubstrateTransactionStatus::Future => TransactionStatus::Future, - SubstrateTransactionStatus::Ready => TransactionStatus::Ready, - SubstrateTransactionStatus::Broadcast(peers) => { - TransactionStatus::Broadcast(peers) - } - SubstrateTransactionStatus::InBlock(hash) => { - TransactionStatus::InBlock(TransactionInBlock { - block_hash: hash, - ext_hash: self.ext_hash, - client: self.client, - }) - } - SubstrateTransactionStatus::Retracted(hash) => { - TransactionStatus::Retracted(hash) - } - SubstrateTransactionStatus::Usurped(hash) => { - TransactionStatus::Usurped(hash) - } - SubstrateTransactionStatus::Dropped => TransactionStatus::Dropped, - SubstrateTransactionStatus::Invalid => TransactionStatus::Invalid, - // Only the following statuses are actually considered "final" (see the substrate - // docs on `TransactionStatus`). Basically, either the transaction makes it into a - // block, or we eventually give up on waiting for it to make it into a block. - // Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. - // - // As an example, a transaction that is `Invalid` on one node due to having the wrong - // nonce might still be valid on some fork on another node which ends up being finalized. - // Equally, a transaction `Dropped` from one node may still be in the transaction pool, - // and make it into a block, on another node. Likewise with `Usurped`. - SubstrateTransactionStatus::FinalityTimeout(hash) => { - self.sub = None; - TransactionStatus::FinalityTimeout(hash) - } - SubstrateTransactionStatus::Finalized(hash) => { - self.sub = None; - TransactionStatus::Finalized(TransactionInBlock { - block_hash: hash, - ext_hash: self.ext_hash, - client: self.client, - }) - } - } - })) + /// Return the next transaction status when it's emitted. This just delegates to the + /// [`futures::Stream`] implementation for [`TransactionProgress`], but allows you to + /// avoid importing that trait if you don't otherwise need it. + pub async fn next_item( + &mut self, + ) -> Option, Error>> { + self.next().await } /// Wait for the transaction to be in a block (but not necessarily finalized), and return @@ -119,17 +82,17 @@ impl<'client, T: Config> TransactionProgress<'client, T> { /// waiting for this to happen. /// /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the - /// transaction progresses, use [`TransactionProgress::next()`] instead. + /// transaction progresses, use [`TransactionProgress::next_item()`] instead. /// /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they /// may well indicate with some probability that the transaction will not make it into a block, /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself. + /// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself. pub async fn wait_for_in_block( mut self, ) -> Result, Error> { - while let Some(status) = self.next().await? { - match status { + while let Some(status) = self.next_item().await { + match status? { // Finalized or otherwise in a block! Return. TransactionStatus::InBlock(s) | TransactionStatus::Finalized(s) => { return Ok(s) @@ -149,17 +112,17 @@ impl<'client, T: Config> TransactionProgress<'client, T> { /// instance when it is, or an error if there was a problem waiting for finalization. /// /// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the - /// transaction progresses, use [`TransactionProgress::next()`] instead. + /// transaction progresses, use [`TransactionProgress::next_item()`] instead. /// /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they /// may well indicate with some probability that the transaction will not make it into a block, /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself. + /// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself. pub async fn wait_for_finalized( mut self, ) -> Result, Error> { - while let Some(status) = self.next().await? { - match status { + while let Some(status) = self.next_item().await { + match status? { // Finalized! Return. TransactionStatus::Finalized(s) => return Ok(s), // Error scenarios; return the error. @@ -178,24 +141,86 @@ impl<'client, T: Config> TransactionProgress<'client, T> { /// as well as a couple of other details (block hash and extrinsic hash). /// /// **Note:** consumes self. If you'd like to perform multiple actions as progress is made, - /// use [`TransactionProgress::next()`] instead. + /// use [`TransactionProgress::next_item()`] instead. /// /// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they /// may well indicate with some probability that the transaction will not make it into a block, /// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower - /// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself. + /// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself. pub async fn wait_for_finalized_success(self) -> Result, Error> { let evs = self.wait_for_finalized().await?.wait_for_success().await?; Ok(evs) } } +impl<'client, T: Config> Stream for TransactionProgress<'client, T> { + type Item = Result, Error>; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let sub = match self.sub.as_mut() { + Some(sub) => sub, + None => return Poll::Ready(None), + }; + + sub.poll_next_unpin(cx) + .map_err(|e| e.into()) + .map_ok(|status| { + match status { + SubstrateTransactionStatus::Future => TransactionStatus::Future, + SubstrateTransactionStatus::Ready => TransactionStatus::Ready, + SubstrateTransactionStatus::Broadcast(peers) => { + TransactionStatus::Broadcast(peers) + } + SubstrateTransactionStatus::InBlock(hash) => { + TransactionStatus::InBlock(TransactionInBlock { + block_hash: hash, + ext_hash: self.ext_hash, + client: self.client, + }) + } + SubstrateTransactionStatus::Retracted(hash) => { + TransactionStatus::Retracted(hash) + } + SubstrateTransactionStatus::Usurped(hash) => { + TransactionStatus::Usurped(hash) + } + SubstrateTransactionStatus::Dropped => TransactionStatus::Dropped, + SubstrateTransactionStatus::Invalid => TransactionStatus::Invalid, + // Only the following statuses are actually considered "final" (see the substrate + // docs on `TransactionStatus`). Basically, either the transaction makes it into a + // block, or we eventually give up on waiting for it to make it into a block. + // Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually. + // + // As an example, a transaction that is `Invalid` on one node due to having the wrong + // nonce might still be valid on some fork on another node which ends up being finalized. + // Equally, a transaction `Dropped` from one node may still be in the transaction pool, + // and make it into a block, on another node. Likewise with `Usurped`. + SubstrateTransactionStatus::FinalityTimeout(hash) => { + self.sub = None; + TransactionStatus::FinalityTimeout(hash) + } + SubstrateTransactionStatus::Finalized(hash) => { + self.sub = None; + TransactionStatus::Finalized(TransactionInBlock { + block_hash: hash, + ext_hash: self.ext_hash, + client: self.client, + }) + } + } + }) + } +} + //* Dev note: The below is adapted from the substrate docs on `TransactionStatus`, which this //* enum was adapted from (and which is an exact copy of `SubstrateTransactionStatus` in this crate). //* Note that the number of finality watchers is, at the time of writing, found in the constant //* `MAX_FINALITY_WATCHERS` in the `sc_transaction_pool` crate. //* -/// Possible transaction statuses returned from our [`TransactionProgress::next()`] call. +/// Possible transaction statuses returned from our [`TransactionProgress::next_item()`] call. /// /// These status events can be grouped based on their kinds as: /// diff --git a/tests/integration/client.rs b/tests/integration/client.rs index a583d3586b..66898837c6 100644 --- a/tests/integration/client.rs +++ b/tests/integration/client.rs @@ -84,7 +84,7 @@ async fn chain_subscribe_blocks() { let node_process = test_node_process().await; let client = node_process.client(); let mut blocks = client.rpc().subscribe_blocks().await.unwrap(); - blocks.next().await.unwrap(); + blocks.next().await.unwrap().unwrap(); } #[async_std::test] @@ -92,7 +92,7 @@ async fn chain_subscribe_finalized_blocks() { let node_process = test_node_process().await; let client = node_process.client(); let mut blocks = client.rpc().subscribe_finalized_blocks().await.unwrap(); - blocks.next().await.unwrap(); + blocks.next().await.unwrap().unwrap(); } #[async_std::test]