Update to jsonrpsee 0.7 and impl Stream on TransactionProgress (#380)

* Update to jsonrpsee 0.7 and impl Stream on TransactionProgress

* Fix doc links after rename to next_item

* Add another doc comment
This commit is contained in:
James Wilson
2022-01-04 15:05:53 +00:00
committed by GitHub
parent 3fb4c7089a
commit 26258c84cc
8 changed files with 126 additions and 100 deletions
+3 -2
View File
@@ -51,8 +51,9 @@ jobs:
with:
# Use this issue template:
filename: .github/issue_templates/nightly_run_failed.md
# Don't create a new issue; skip updating existing:
update_existing: false
# Update existing issue if found; hopefully will make it clearer
# that it is still an issue:
update_existing: true
# Look for new *open* issues in this search (we want to
# create a new one if we only find closed versions):
search_existing: open
+1 -1
View File
@@ -24,7 +24,7 @@ chameleon = "0.1.0"
scale-info = { version = "1.0.0", features = ["bit-vec"] }
futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { version = "0.5.1", features = ["macros", "ws-client", "http-client"] }
jsonrpsee = { version = "0.7.0", features = ["macros", "ws-client", "http-client"] }
log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] }
+3 -1
View File
@@ -22,6 +22,7 @@
//! polkadot --dev --tmp
//! ```
use futures::StreamExt;
use sp_keyring::AccountKeyring;
use subxt::{
ClientBuilder,
@@ -144,7 +145,8 @@ async fn handle_transfer_events() -> Result<(), Box<dyn std::error::Error>> {
.sign_and_submit_then_watch(&signer)
.await?;
while let Some(ev) = balance_transfer_progress.next().await? {
while let Some(ev) = balance_transfer_progress.next().await {
let ev = ev?;
use subxt::TransactionStatus::*;
// Made it into a block, but not finalized.
+1 -1
View File
@@ -22,7 +22,7 @@ use crate::{
},
Metadata,
};
use jsonrpsee::types::Error as RequestError;
use jsonrpsee::core::error::Error as RequestError;
use sp_core::crypto::SecretStringError;
use sp_runtime::{
transaction_validity::TransactionValidityError,
+18 -20
View File
@@ -33,25 +33,23 @@ use core::{
};
use frame_metadata::RuntimeMetadataPrefixed;
use jsonrpsee::{
core::{
client::{
Client,
ClientT,
Subscription,
SubscriptionClientT,
},
to_json_value,
DeserializeOwned,
Error as RpcError,
JsonValue,
},
http_client::{
HttpClient,
HttpClientBuilder,
},
types::{
to_json_value,
traits::{
Client,
SubscriptionClient,
},
DeserializeOwned,
Error as RpcError,
JsonValue,
Subscription,
},
ws_client::{
WsClient,
WsClientBuilder,
},
ws_client::WsClientBuilder,
};
use serde::{
Deserialize,
@@ -172,7 +170,7 @@ pub enum SubstrateTransactionStatus<Hash, BlockHash> {
#[derive(Clone)]
pub enum RpcClient {
/// JSONRPC client WebSocket transport.
WebSocket(Arc<WsClient>),
WebSocket(Arc<Client>),
/// JSONRPC client HTTP transport.
// NOTE: Arc because `HttpClient` is not clone.
Http(Arc<HttpClient>),
@@ -239,14 +237,14 @@ impl RpcClient {
}
}
impl From<WsClient> for RpcClient {
fn from(client: WsClient) -> Self {
impl From<Client> for RpcClient {
fn from(client: Client) -> Self {
RpcClient::WebSocket(Arc::new(client))
}
}
impl From<Arc<WsClient>> for RpcClient {
fn from(client: Arc<WsClient>) -> Self {
impl From<Arc<Client>> for RpcClient {
fn from(client: Arc<Client>) -> Self {
RpcClient::WebSocket(client)
}
}
+5 -5
View File
@@ -14,9 +14,9 @@
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
use jsonrpsee::types::{
use jsonrpsee::core::{
client::Subscription,
DeserializeOwned,
Subscription,
};
use sp_core::{
storage::{
@@ -247,12 +247,12 @@ where
T: DeserializeOwned,
{
match sub.next().await {
Ok(Some(next)) => Some(next),
Ok(None) => None,
Err(e) => {
Some(Ok(next)) => Some(next),
Some(Err(e)) => {
log::error!("Subscription {} failed: {:?} dropping", sub_name, e);
None
}
None => None,
}
}
+93 -68
View File
@@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with subxt. If not, see <http://www.gnu.org/licenses/>.
use std::task::Poll;
use sp_core::storage::StorageKey;
use sp_runtime::traits::Hash;
pub use sp_runtime::traits::SignedExtension;
@@ -30,9 +32,13 @@ use crate::{
Config,
Phase,
};
use jsonrpsee::types::{
use futures::{
Stream,
StreamExt,
};
use jsonrpsee::core::{
client::Subscription as RpcSubscription,
Error as RpcError,
Subscription as RpcSubscription,
};
/// This struct represents a subscription to the progress of some transaction, and is
@@ -44,6 +50,11 @@ pub struct TransactionProgress<'client, T: Config> {
client: &'client Client<T>,
}
// The above type is not `Unpin` by default unless the generic param `T` is,
// so we manually make it clear that Unpin is actually fine regardless of `T`
// (we don't care if this moves around in memory while it's "pinned").
impl<'client, T: Config> Unpin for TransactionProgress<'client, T> {}
impl<'client, T: Config> TransactionProgress<'client, T> {
pub(crate) fn new(
sub: RpcSubscription<SubstrateTransactionStatus<T::Hash, T::Hash>>,
@@ -57,61 +68,13 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
}
}
/// Return the next transaction status when it's emitted.
pub async fn next(&mut self) -> Result<Option<TransactionStatus<'client, T>>, Error> {
// Return `None` if the subscription has been dropped:
let sub = match &mut self.sub {
Some(sub) => sub,
None => return Ok(None),
};
// Return the next item otherwise:
let res = sub.next().await?;
Ok(res.map(|status| {
match status {
SubstrateTransactionStatus::Future => TransactionStatus::Future,
SubstrateTransactionStatus::Ready => TransactionStatus::Ready,
SubstrateTransactionStatus::Broadcast(peers) => {
TransactionStatus::Broadcast(peers)
}
SubstrateTransactionStatus::InBlock(hash) => {
TransactionStatus::InBlock(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
SubstrateTransactionStatus::Retracted(hash) => {
TransactionStatus::Retracted(hash)
}
SubstrateTransactionStatus::Usurped(hash) => {
TransactionStatus::Usurped(hash)
}
SubstrateTransactionStatus::Dropped => TransactionStatus::Dropped,
SubstrateTransactionStatus::Invalid => TransactionStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TransactionStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
// Equally, a transaction `Dropped` from one node may still be in the transaction pool,
// and make it into a block, on another node. Likewise with `Usurped`.
SubstrateTransactionStatus::FinalityTimeout(hash) => {
self.sub = None;
TransactionStatus::FinalityTimeout(hash)
}
SubstrateTransactionStatus::Finalized(hash) => {
self.sub = None;
TransactionStatus::Finalized(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
}
}))
/// Return the next transaction status when it's emitted. This just delegates to the
/// [`futures::Stream`] implementation for [`TransactionProgress`], but allows you to
/// avoid importing that trait if you don't otherwise need it.
pub async fn next_item(
&mut self,
) -> Option<Result<TransactionStatus<'client, T>, Error>> {
self.next().await
}
/// Wait for the transaction to be in a block (but not necessarily finalized), and return
@@ -119,17 +82,17 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// waiting for this to happen.
///
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TransactionProgress::next()`] instead.
/// transaction progresses, use [`TransactionProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_in_block(
mut self,
) -> Result<TransactionInBlock<'client, T>, Error> {
while let Some(status) = self.next().await? {
match status {
while let Some(status) = self.next_item().await {
match status? {
// Finalized or otherwise in a block! Return.
TransactionStatus::InBlock(s) | TransactionStatus::Finalized(s) => {
return Ok(s)
@@ -149,17 +112,17 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// instance when it is, or an error if there was a problem waiting for finalization.
///
/// **Note:** consumes `self`. If you'd like to perform multiple actions as the state of the
/// transaction progresses, use [`TransactionProgress::next()`] instead.
/// transaction progresses, use [`TransactionProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized(
mut self,
) -> Result<TransactionInBlock<'client, T>, Error> {
while let Some(status) = self.next().await? {
match status {
while let Some(status) = self.next_item().await {
match status? {
// Finalized! Return.
TransactionStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
@@ -178,24 +141,86 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// as well as a couple of other details (block hash and extrinsic hash).
///
/// **Note:** consumes self. If you'd like to perform multiple actions as progress is made,
/// use [`TransactionProgress::next()`] instead.
/// use [`TransactionProgress::next_item()`] instead.
///
/// **Note:** transaction statuses like `Invalid` and `Usurped` are ignored, because while they
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
/// level [`TransactionProgress::next_item()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized_success(self) -> Result<TransactionEvents<T>, Error> {
let evs = self.wait_for_finalized().await?.wait_for_success().await?;
Ok(evs)
}
}
impl<'client, T: Config> Stream for TransactionProgress<'client, T> {
type Item = Result<TransactionStatus<'client, T>, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let sub = match self.sub.as_mut() {
Some(sub) => sub,
None => return Poll::Ready(None),
};
sub.poll_next_unpin(cx)
.map_err(|e| e.into())
.map_ok(|status| {
match status {
SubstrateTransactionStatus::Future => TransactionStatus::Future,
SubstrateTransactionStatus::Ready => TransactionStatus::Ready,
SubstrateTransactionStatus::Broadcast(peers) => {
TransactionStatus::Broadcast(peers)
}
SubstrateTransactionStatus::InBlock(hash) => {
TransactionStatus::InBlock(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
SubstrateTransactionStatus::Retracted(hash) => {
TransactionStatus::Retracted(hash)
}
SubstrateTransactionStatus::Usurped(hash) => {
TransactionStatus::Usurped(hash)
}
SubstrateTransactionStatus::Dropped => TransactionStatus::Dropped,
SubstrateTransactionStatus::Invalid => TransactionStatus::Invalid,
// Only the following statuses are actually considered "final" (see the substrate
// docs on `TransactionStatus`). Basically, either the transaction makes it into a
// block, or we eventually give up on waiting for it to make it into a block.
// Even `Dropped`/`Invalid`/`Usurped` transactions might make it into a block eventually.
//
// As an example, a transaction that is `Invalid` on one node due to having the wrong
// nonce might still be valid on some fork on another node which ends up being finalized.
// Equally, a transaction `Dropped` from one node may still be in the transaction pool,
// and make it into a block, on another node. Likewise with `Usurped`.
SubstrateTransactionStatus::FinalityTimeout(hash) => {
self.sub = None;
TransactionStatus::FinalityTimeout(hash)
}
SubstrateTransactionStatus::Finalized(hash) => {
self.sub = None;
TransactionStatus::Finalized(TransactionInBlock {
block_hash: hash,
ext_hash: self.ext_hash,
client: self.client,
})
}
}
})
}
}
//* Dev note: The below is adapted from the substrate docs on `TransactionStatus`, which this
//* enum was adapted from (and which is an exact copy of `SubstrateTransactionStatus` in this crate).
//* Note that the number of finality watchers is, at the time of writing, found in the constant
//* `MAX_FINALITY_WATCHERS` in the `sc_transaction_pool` crate.
//*
/// Possible transaction statuses returned from our [`TransactionProgress::next()`] call.
/// Possible transaction statuses returned from our [`TransactionProgress::next_item()`] call.
///
/// These status events can be grouped based on their kinds as:
///
+2 -2
View File
@@ -84,7 +84,7 @@ async fn chain_subscribe_blocks() {
let node_process = test_node_process().await;
let client = node_process.client();
let mut blocks = client.rpc().subscribe_blocks().await.unwrap();
blocks.next().await.unwrap();
blocks.next().await.unwrap().unwrap();
}
#[async_std::test]
@@ -92,7 +92,7 @@ async fn chain_subscribe_finalized_blocks() {
let node_process = test_node_process().await;
let client = node_process.client();
let mut blocks = client.rpc().subscribe_finalized_blocks().await.unwrap();
blocks.next().await.unwrap();
blocks.next().await.unwrap().unwrap();
}
#[async_std::test]