tx: Remove tx_broadcast transaction from the pool (#4050)

This PR ensures that broadcast future cleans-up the submitted extrinsic
from the pool, iff the `broadcast_stop` operation has been called.

This effectively cleans-up transactions from the pool when the
`broadcast_stop` is called.

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2024-04-18 18:57:44 +03:00
committed by GitHub
parent 37e338f046
commit c891fdabf4
2 changed files with 31 additions and 18 deletions
@@ -94,7 +94,7 @@ async fn tx_broadcast_enters_pool() {
#[tokio::test] #[tokio::test]
async fn tx_broadcast_invalid_tx() { async fn tx_broadcast_invalid_tx() {
let (_, pool, _, tx_api, mut exec_middleware, _) = setup_api(Default::default()); let (_, pool, _, tx_api, exec_middleware, _) = setup_api(Default::default());
// Invalid parameters. // Invalid parameters.
let err = tx_api let err = tx_api
@@ -114,13 +114,10 @@ async fn tx_broadcast_invalid_tx() {
assert_eq!(0, pool.status().ready); assert_eq!(0, pool.status().ready);
// Await the broadcast future to exit. // The broadcast future should never be spawned when the tx decoding fails.
// Without this we'd be subject to races, where we try to call the stop before the tx is
// dropped.
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks()); assert_eq!(0, exec_middleware.num_tasks());
// The broadcast future was dropped, and the operation is no longer active. // The operation ID is no longer active.
// When the operation is not active, either from the tx being finalized or a // When the operation is not active, either from the tx being finalized or a
// terminal error; the stop method should return an error. // terminal error; the stop method should return an error.
let err = tx_api let err = tx_api
@@ -37,7 +37,7 @@ use std::{collections::HashMap, sync::Arc};
use super::error::ErrorBroadcast; use super::error::ErrorBroadcast;
/// An API for transaction RPC calls. /// An API for transaction RPC calls.
pub struct TransactionBroadcast<Pool, Client> { pub struct TransactionBroadcast<Pool: TransactionPool, Client> {
/// Substrate client. /// Substrate client.
client: Arc<Client>, client: Arc<Client>,
/// Transactions pool. /// Transactions pool.
@@ -45,16 +45,18 @@ pub struct TransactionBroadcast<Pool, Client> {
/// Executor to spawn subscriptions. /// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor, executor: SubscriptionTaskExecutor,
/// The broadcast operation IDs. /// The broadcast operation IDs.
broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState>>>, broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState<Pool>>>>,
} }
/// The state of a broadcast operation. /// The state of a broadcast operation.
struct BroadcastState { struct BroadcastState<Pool: TransactionPool> {
/// Handle to abort the running future that broadcasts the transaction. /// Handle to abort the running future that broadcasts the transaction.
handle: AbortHandle, handle: AbortHandle,
/// Associated tx hash.
tx_hash: <Pool as TransactionPool>::Hash,
} }
impl<Pool, Client> TransactionBroadcast<Pool, Client> { impl<Pool: TransactionPool, Client> TransactionBroadcast<Pool, Client> {
/// Creates a new [`TransactionBroadcast`]. /// Creates a new [`TransactionBroadcast`].
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self { pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() } TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() }
@@ -106,17 +108,22 @@ where
// The unique ID of this operation. // The unique ID of this operation.
let id = self.generate_unique_id(); let id = self.generate_unique_id();
// The JSON-RPC server might check whether the transaction is valid before broadcasting it.
// If it does so and if the transaction is invalid, the server should silently do nothing
// and the JSON-RPC client is not informed of the problem. Invalid transactions should still
// count towards the limit to the number of simultaneously broadcasted transactions.
let Ok(decoded_extrinsic) = TransactionFor::<Pool>::decode(&mut &bytes[..]) else {
return Ok(Some(id));
};
// Save the tx hash to remove it later.
let tx_hash = pool.hash_of(&decoded_extrinsic);
let mut best_block_import_stream = let mut best_block_import_stream =
Box::pin(self.client.import_notification_stream().filter_map( Box::pin(self.client.import_notification_stream().filter_map(
|notification| async move { notification.is_new_best.then_some(notification.hash) }, |notification| async move { notification.is_new_best.then_some(notification.hash) },
)); ));
let broadcast_transaction_fut = async move { let broadcast_transaction_fut = async move {
// There is nothing we could do with an extrinsic of invalid format.
let Ok(decoded_extrinsic) = TransactionFor::<Pool>::decode(&mut &bytes[..]) else {
return;
};
// Flag to determine if the we should broadcast the transaction again. // Flag to determine if the we should broadcast the transaction again.
let mut is_done = false; let mut is_done = false;
@@ -169,17 +176,26 @@ where
let (fut, handle) = futures::future::abortable(broadcast_transaction_fut); let (fut, handle) = futures::future::abortable(broadcast_transaction_fut);
let broadcast_ids = self.broadcast_ids.clone(); let broadcast_ids = self.broadcast_ids.clone();
let drop_id = id.clone(); let drop_id = id.clone();
let pool = self.pool.clone();
// The future expected by the executor must be `Future<Output = ()>` instead of // The future expected by the executor must be `Future<Output = ()>` instead of
// `Future<Output = Result<(), Aborted>>`. // `Future<Output = Result<(), Aborted>>`.
let fut = fut.map(move |_| { let fut = fut.map(move |result| {
// Remove the entry from the broadcast IDs map. // Remove the entry from the broadcast IDs map.
broadcast_ids.write().remove(&drop_id); let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };
// The broadcast was not stopped.
if result.is_ok() {
return
}
// Best effort pool removal (tx can already be finalized).
pool.remove_invalid(&[broadcast_state.tx_hash]);
}); });
// Keep track of this entry and the abortable handle. // Keep track of this entry and the abortable handle.
{ {
let mut broadcast_ids = self.broadcast_ids.write(); let mut broadcast_ids = self.broadcast_ids.write();
broadcast_ids.insert(id.clone(), BroadcastState { handle }); broadcast_ids.insert(id.clone(), BroadcastState { handle, tx_hash });
} }
sc_rpc::utils::spawn_subscription_task(&self.executor, fut); sc_rpc::utils::spawn_subscription_task(&self.executor, fut);