txBroadcast: Stabilize to version 1 (#4169)

This PR stabilizes the txBroadcast API to version 1.

Ideally needs:
- https://github.com/paritytech/polkadot-sdk/pull/4050
- https://github.com/paritytech/polkadot-sdk/pull/3772

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2024-04-19 12:28:48 +03:00
committed by GitHub
parent 69f4373178
commit 148d942ec0
3 changed files with 46 additions and 51 deletions
+8
View File
@@ -0,0 +1,8 @@
title: Stabilize transactionBroadcast RPC class to version 1
doc:
- audience: Node Dev
description: |
The transactionBroadcast RPC API is stabilized to version 1.
crates: [ ]
@@ -47,7 +47,8 @@ pub trait TransactionBroadcastApi {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_broadcast", raw_method)]
#[method(name = "transaction_v1_broadcast", raw_method)]
async fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;
/// Broadcast an extrinsic to the chain.
@@ -55,6 +56,6 @@ pub trait TransactionBroadcastApi {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_stop", raw_method)]
#[method(name = "transaction_v1_stop", raw_method)]
async fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
}
@@ -46,12 +46,12 @@ async fn tx_broadcast_enters_pool() {
let xt = hex_string(&uxt.encode());
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
@@ -84,10 +84,7 @@ async fn tx_broadcast_enters_pool() {
// The future broadcast awaits for the finalized status to be reached.
// Force the future to exit by calling stop.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
let _: () = tx_api.call("transaction_v1_stop", rpc_params![&operation_id]).await.unwrap();
// Ensure the broadcast future finishes.
let _ = get_next_event!(&mut exec_middleware.recv);
@@ -101,7 +98,7 @@ async fn tx_broadcast_invalid_tx() {
// Invalid parameters.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8])
.call::<_, serde_json::Value>("transaction_v1_broadcast", [1u8])
.await
.unwrap_err();
assert_matches!(err,
@@ -113,7 +110,7 @@ async fn tx_broadcast_invalid_tx() {
// Invalid transaction that cannot be decoded. The broadcast silently exits.
let xt = "0xdeadbeef";
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
assert_eq!(0, pool.status().ready);
@@ -124,7 +121,7 @@ async fn tx_broadcast_invalid_tx() {
// When the operation is not active, either from the tx being finalized or a
// terminal error; the stop method should return an error.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.call::<_, serde_json::Value>("transaction_v1_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
@@ -138,7 +135,7 @@ async fn tx_stop_with_invalid_operation_id() {
// Make an invalid stop call.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"])
.call::<_, serde_json::Value>("transaction_v1_stop", ["invalid_operation_id"])
.await
.unwrap_err();
assert_matches!(err,
@@ -161,15 +158,13 @@ async fn tx_broadcast_resubmits_future_nonce_tx() {
let future_uxt = uxt(Alice, ALICE_NONCE + 1);
let future_xt = hex_string(&future_uxt.encode());
let future_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&future_xt])
.await
.unwrap();
let future_operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&future_xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
@@ -188,13 +183,11 @@ async fn tx_broadcast_resubmits_future_nonce_tx() {
let block_2_header = api.push_block(2, vec![], true);
let block_2 = block_2_header.hash();
let operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&current_xt])
.await
.unwrap();
let operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&current_xt]).await.unwrap();
assert_ne!(future_operation_id, operation_id);
// Announce block 2 to `transaction_unstable_broadcast`.
// Announce block 2 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_2_header).await;
// Collect the events of both transactions.
@@ -249,12 +242,12 @@ async fn tx_broadcast_stop_after_broadcast_finishes() {
let xt = hex_string(&uxt.encode());
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction
// pool.inner_pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
@@ -303,7 +296,7 @@ async fn tx_broadcast_stop_after_broadcast_finishes() {
// The operation ID is no longer valid, check that the broadcast future
// cleared out the inner state of the operation.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.call::<_, serde_json::Value>("transaction_v1_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
@@ -328,14 +321,14 @@ async fn tx_broadcast_resubmits_invalid_tx() {
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
let _operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
let block_1_header = api.push_block(1, vec![], true);
let block_1 = block_1_header.hash();
// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
@@ -355,7 +348,7 @@ async fn tx_broadcast_resubmits_invalid_tx() {
pool.inner_pool.maintain(event).await;
assert_eq!(1, pool.inner_pool.status().ready);
// Ensure the `transaction_unstable_broadcast` is aware of the invalid transaction.
// Ensure the `transaction_v1_broadcast` is aware of the invalid transaction.
let event = get_next_event!(&mut pool_middleware);
// Because we have received an `Invalid` status, we try to broadcast the transaction with the
// next announced block.
@@ -388,7 +381,7 @@ async fn tx_broadcast_resubmits_invalid_tx() {
pool.inner_pool.maintain(event).await;
assert_eq!(0, pool.inner_pool.status().ready);
// Announce block to `transaction_unstable_broadcast`.
// Announce block to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_3_header).await;
let event = get_next_event!(&mut pool_middleware);
@@ -456,12 +449,10 @@ async fn tx_broadcast_resubmits_dropped_tx() {
// are immediately dropped.
api.set_priority(&current_uxt, 10);
let current_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&current_xt])
.await
.unwrap();
let current_operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&current_xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
let block_1_header = api.push_block(1, vec![], true);
let event =
ChainEvent::Finalized { hash: block_1_header.hash(), tree_route: Arc::from(vec![]) };
@@ -480,10 +471,8 @@ async fn tx_broadcast_resubmits_dropped_tx() {
// The future tx has priority 2, smaller than the current 10.
api.set_priority(&future_uxt, 2);
let future_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&future_xt])
.await
.unwrap();
let future_operation_id: String =
tx_api.call("transaction_v1_broadcast", rpc_params![&future_xt]).await.unwrap();
assert_ne!(current_operation_id, future_operation_id);
let block_2_header = api.push_block(2, vec![], true);
@@ -535,12 +524,12 @@ async fn tx_broadcast_limit_reached() {
let xt = hex_string(&uxt.encode());
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
// Announce block 1 to `transaction_v1_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// Ensure the tx propagated from `transaction_v1_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
@@ -552,17 +541,14 @@ async fn tx_broadcast_limit_reached() {
assert_eq!(1, exec_middleware.num_tasks());
let operation_id_limit_reached: Option<String> =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
assert!(operation_id_limit_reached.is_none(), "No operation ID => tx was rejected");
// We still have in flight one operation.
assert_eq!(1, exec_middleware.num_tasks());
// Force the future to exit by calling stop.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
let _: () = tx_api.call("transaction_v1_stop", rpc_params![&operation_id]).await.unwrap();
// Ensure the broadcast future finishes.
let _ = get_next_event!(&mut exec_middleware.recv);
@@ -570,5 +556,5 @@ async fn tx_broadcast_limit_reached() {
// Can resubmit again now.
let _operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
tx_api.call("transaction_v1_broadcast", rpc_params![&xt]).await.unwrap();
}