rpc-v2/tx/tests: Add transaction broadcast tests and check propagated tx status (#3193)

This PR adds tests for the `transaction_broadcast` method.


The testing needs to coordinate the following components:
- The `TestApi` marks transactions as invalid and implements
`ChainApi::validate_transaction`
- this is what dictates if a transaction is valid or not and is called
from within the `BasicPool`
- The `BasicPool` which maintains the transactions and implements
`submit_and_watch` needed by the tx broadcast to submit the transaction
- The status of the transaction pool is exposed by mocking the BasicPool
- The `ChainHeadMockClient` which mocks the
`BlockchainEvents::import_notification_stream` needed by the tx
broadcast to know to which blocks the transaction is submitted

The following changes have been added to the substrate testing to
accommodate this:
- `TestApi` gets ` remove_invalid`, counterpart to `add_invalid` to
ensure an invalid transaction can become valid again; as well as a
priority setter for extrinsics
- `BasicPool` test constructor is extended with options for the
`PoolRotator`
- this mechanism is needed because transactions are banned for 30mins
(default) after they are declared invalid
  - testing bypasses this by providing a `Duration::ZERO`

### Testing Scenarios

- Capture the status of the transaction as it is normally broadcasted
- `transaction_stop` is valid while the transaction is in progress
- A future transaction is handled when the dependencies are completed
- Try to resubmit the transaction at a later block (currently invalid)
- An invalid transaction status is propagated; the transaction is marked
as temporarily banned; then the ban expires and transaction is
resubmitted
  
This builds on top of:
https://github.com/paritytech/polkadot-sdk/pull/3079
Part of: https://github.com/paritytech/polkadot-sdk/issues/3084

cc @paritytech/subxt-team

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: James Wilson <james@jsdw.me>
This commit is contained in:
Alexandru Vasile
2024-02-28 11:43:58 +02:00
committed by GitHub
parent 12ce4f7d04
commit f1b2189e83
10 changed files with 985 additions and 244 deletions
@@ -63,7 +63,7 @@ impl<Client> ChainHeadMockClient<Client> {
BlockImportNotification::new(header.hash(), BlockOrigin::Own, header, true, None, sink);
for sink in self.import_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
let _ = sink.unbounded_send(notification.clone());
}
}
@@ -83,7 +83,7 @@ impl<Client> ChainHeadMockClient<Client> {
let notification = FinalityNotification::from_summary(summary, sink);
for sink in self.finality_sinks.lock().iter_mut() {
sink.unbounded_send(notification.clone()).unwrap();
let _ = sink.unbounded_send(notification.clone());
}
}
}
@@ -1,238 +0,0 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use super::*;
use crate::{
chain_head::test_utils::ChainHeadMockClient, hex_string,
transaction::TransactionBroadcast as RpcTransactionBroadcast,
};
use assert_matches::assert_matches;
use codec::Encode;
use futures::Future;
use jsonrpsee::{rpc_params, MethodsError as Error, RpcModule};
use sc_transaction_pool::*;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool};
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::{pin::Pin, sync::Arc, time::Duration};
use substrate_test_runtime_client::{prelude::*, AccountKeyring::*, Client};
use substrate_test_runtime_transaction_pool::{uxt, TestApi};
use tokio::sync::mpsc;
type Block = substrate_test_runtime_client::runtime::Block;
/// Wrap the `TaskExecutor` to know when the broadcast future is dropped.
#[derive(Clone)]
struct TaskExecutorBroadcast {
executor: TaskExecutor,
sender: mpsc::UnboundedSender<()>,
}
/// The channel that receives events when the broadcast futures are dropped.
type TaskExecutorRecv = mpsc::UnboundedReceiver<()>;
impl TaskExecutorBroadcast {
/// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures
/// are dropped.
fn new() -> (Self, TaskExecutorRecv) {
let (sender, recv) = mpsc::unbounded_channel();
(Self { executor: TaskExecutor::new(), sender }, recv)
}
}
impl SpawnNamed for TaskExecutorBroadcast {
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let future = Box::pin(async move {
future.await;
let _ = sender.send(());
});
self.executor.spawn(name, group, future)
}
fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let future = Box::pin(async move {
future.await;
let _ = sender.send(());
});
self.executor.spawn_blocking(name, group, future)
}
}
/// Initial Alice account nonce.
const ALICE_NONCE: u64 = 209;
fn create_basic_pool_with_genesis(
test_api: Arc<TestApi>,
) -> (BasicPool<TestApi, Block>, Pin<Box<dyn Future<Output = ()> + Send>>) {
let genesis_hash = {
test_api
.chain()
.read()
.block_by_number
.get(&0)
.map(|blocks| blocks[0].0.header.hash())
.expect("there is block 0. qed")
};
BasicPool::new_test(test_api, genesis_hash, genesis_hash)
}
fn maintained_pool() -> (BasicPool<TestApi, Block>, Arc<TestApi>, futures::executor::ThreadPool) {
let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE));
let (pool, background_task) = create_basic_pool_with_genesis(api.clone());
let thread_pool = futures::executor::ThreadPool::new().unwrap();
thread_pool.spawn_ok(background_task);
(pool, api, thread_pool)
}
fn setup_api() -> (
Arc<TestApi>,
Arc<BasicPool<TestApi, Block>>,
Arc<ChainHeadMockClient<Client<Backend>>>,
RpcModule<
TransactionBroadcast<BasicPool<TestApi, Block>, ChainHeadMockClient<Client<Backend>>>,
>,
TaskExecutorRecv,
) {
let (pool, api, _) = maintained_pool();
let pool = Arc::new(pool);
let builder = TestClientBuilder::new();
let client = Arc::new(builder.build());
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
let (task_executor, executor_recv) = TaskExecutorBroadcast::new();
let tx_api =
RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor))
.into_rpc();
(api, pool, client_mock, tx_api, executor_recv)
}
#[tokio::test]
async fn tx_broadcast_enters_pool() {
let (api, pool, client_mock, tx_api, _) = setup_api();
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
// TODO: Improve testability by extending the `transaction_unstable_broadcast` with
// a middleware trait that intercepts the transaction status for testing.
let mut num_retries = 12;
while num_retries > 0 && pool.status().ready != 1 {
tokio::time::sleep(Duration::from_secs(5)).await;
num_retries -= 1;
}
assert_eq!(1, pool.status().ready);
assert_eq!(uxt.encode().len(), pool.status().ready_bytes);
// Import block 2 with the transaction included.
let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();
// Announce block 2 to the pool.
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.maintain(event).await;
assert_eq!(0, pool.status().ready);
// Stop call can still be made.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
}
#[tokio::test]
async fn tx_broadcast_invalid_tx() {
let (_, pool, _, tx_api, mut exec_recv) = setup_api();
// Invalid parameters.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8])
.await
.unwrap_err();
assert_matches!(err,
Error::JsonRpc(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
);
assert_eq!(0, pool.status().ready);
// Invalid transaction that cannot be decoded. The broadcast silently exits.
let xt = "0xdeadbeef";
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
assert_eq!(0, pool.status().ready);
// Await the broadcast future to exit.
// Without this we'd be subject to races, where we try to call the stop before the tx is
// dropped.
exec_recv.recv().await.unwrap();
// The broadcast future was dropped, and the operation is no longer active.
// When the operation is not active, either from the tx being finalized or a
// terminal error; the stop method should return an error.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
Error::JsonRpc(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
#[tokio::test]
async fn tx_invalid_stop() {
let (_, _, _, tx_api, _) = setup_api();
// Make an invalid stop call.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"])
.await
.unwrap_err();
assert_matches!(err,
Error::JsonRpc(err) if err.code() == super::error::json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
@@ -0,0 +1,100 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
use std::sync::{atomic::AtomicUsize, Arc};
use tokio::sync::mpsc;
/// Wrap the `TaskExecutor` to know when the broadcast future is dropped.
#[derive(Clone)]
pub struct TaskExecutorBroadcast {
executor: TaskExecutor,
sender: mpsc::UnboundedSender<()>,
num_tasks: Arc<AtomicUsize>,
}
/// The channel that receives events when the broadcast futures are dropped.
pub type TaskExecutorRecv = mpsc::UnboundedReceiver<()>;
/// The state of the `TaskExecutorBroadcast`.
pub struct TaskExecutorState {
pub recv: TaskExecutorRecv,
pub num_tasks: Arc<AtomicUsize>,
}
impl TaskExecutorState {
pub fn num_tasks(&self) -> usize {
self.num_tasks.load(std::sync::atomic::Ordering::Acquire)
}
}
impl TaskExecutorBroadcast {
/// Construct a new `TaskExecutorBroadcast` and a receiver to know when the broadcast futures
/// are dropped.
pub fn new() -> (Self, TaskExecutorState) {
let (sender, recv) = mpsc::unbounded_channel();
let num_tasks = Arc::new(AtomicUsize::new(0));
(
Self { executor: TaskExecutor::new(), sender, num_tasks: num_tasks.clone() },
TaskExecutorState { recv, num_tasks },
)
}
}
impl SpawnNamed for TaskExecutorBroadcast {
fn spawn(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let num_tasks = self.num_tasks.clone();
let future = Box::pin(async move {
num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
future.await;
num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
let _ = sender.send(());
});
self.executor.spawn(name, group, future)
}
fn spawn_blocking(
&self,
name: &'static str,
group: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
let sender = self.sender.clone();
let num_tasks = self.num_tasks.clone();
let future = Box::pin(async move {
num_tasks.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
future.await;
num_tasks.fetch_sub(1, std::sync::atomic::Ordering::AcqRel);
let _ = sender.send(());
});
self.executor.spawn_blocking(name, group, future)
}
}
@@ -0,0 +1,187 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use codec::Encode;
use futures::Future;
use sc_transaction_pool::BasicPool;
use sc_transaction_pool_api::{
ImportNotificationStream, PoolFuture, PoolStatus, ReadyTransactions, TransactionFor,
TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
};
use crate::hex_string;
use futures::{FutureExt, StreamExt};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{collections::HashMap, pin::Pin, sync::Arc};
use substrate_test_runtime_transaction_pool::TestApi;
use tokio::sync::mpsc;
pub type Block = substrate_test_runtime_client::runtime::Block;
pub type TxTestPool = MiddlewarePool;
pub type TxStatusType<Pool> = sc_transaction_pool_api::TransactionStatus<
sc_transaction_pool_api::TxHash<Pool>,
sc_transaction_pool_api::BlockHash<Pool>,
>;
pub type TxStatusTypeTest = TxStatusType<TxTestPool>;
/// The type of the event that the middleware captures.
#[derive(Debug, PartialEq)]
pub enum MiddlewarePoolEvent {
TransactionStatus {
transaction: String,
status: sc_transaction_pool_api::TransactionStatus<
<Block as BlockT>::Hash,
<Block as BlockT>::Hash,
>,
},
PoolError {
transaction: String,
err: String,
},
}
/// The channel that receives events when the broadcast futures are dropped.
pub type MiddlewarePoolRecv = mpsc::UnboundedReceiver<MiddlewarePoolEvent>;
/// Add a middleware to the transaction pool.
///
/// This wraps the `submit_and_watch` to gain access to the events.
pub struct MiddlewarePool {
pub inner_pool: Arc<BasicPool<TestApi, Block>>,
/// Send the middleware events to the test.
sender: mpsc::UnboundedSender<MiddlewarePoolEvent>,
}
impl MiddlewarePool {
/// Construct a new [`MiddlewarePool`].
pub fn new(pool: Arc<BasicPool<TestApi, Block>>) -> (Self, MiddlewarePoolRecv) {
let (sender, recv) = mpsc::unbounded_channel();
(MiddlewarePool { inner_pool: pool, sender }, recv)
}
}
impl TransactionPool for MiddlewarePool {
type Block = <BasicPool<TestApi, Block> as TransactionPool>::Block;
type Hash = <BasicPool<TestApi, Block> as TransactionPool>::Hash;
type InPoolTransaction = <BasicPool<TestApi, Block> as TransactionPool>::InPoolTransaction;
type Error = <BasicPool<TestApi, Block> as TransactionPool>::Error;
fn submit_at(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
self.inner_pool.submit_at(at, source, xts)
}
fn submit_one(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<TxHash<Self>, Self::Error> {
self.inner_pool.submit_one(at, source, xt)
}
fn submit_and_watch(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xt: TransactionFor<Self>,
) -> PoolFuture<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
let pool = self.inner_pool.clone();
let sender = self.sender.clone();
let transaction = hex_string(&xt.encode());
async move {
let watcher = match pool.submit_and_watch(at, source, xt).await {
Ok(watcher) => watcher,
Err(err) => {
let _ = sender.send(MiddlewarePoolEvent::PoolError {
transaction: transaction.clone(),
err: err.to_string(),
});
return Err(err);
},
};
let watcher = watcher.map(move |status| {
let sender = sender.clone();
let transaction = transaction.clone();
let _ = sender.send(MiddlewarePoolEvent::TransactionStatus {
transaction,
status: status.clone(),
});
status
});
Ok(watcher.boxed())
}
.boxed()
}
fn remove_invalid(&self, hashes: &[TxHash<Self>]) -> Vec<Arc<Self::InPoolTransaction>> {
self.inner_pool.remove_invalid(hashes)
}
fn status(&self) -> PoolStatus {
self.inner_pool.status()
}
fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
self.inner_pool.import_notification_stream()
}
fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.inner_pool.hash_of(xt)
}
fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
self.inner_pool.on_broadcasted(propagations)
}
fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
self.inner_pool.ready_transaction(hash)
}
fn ready_at(
&self,
at: NumberFor<Self::Block>,
) -> Pin<
Box<
dyn Future<
Output = Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send>,
> + Send,
>,
> {
self.inner_pool.ready_at(at)
}
fn ready(&self) -> Box<dyn ReadyTransactions<Item = Arc<Self::InPoolTransaction>> + Send> {
self.inner_pool.ready()
}
fn futures(&self) -> Vec<Self::InPoolTransaction> {
self.inner_pool.futures()
}
}
@@ -0,0 +1,24 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
mod executor;
mod middleware_pool;
#[macro_use]
mod setup;
mod transaction_broadcast_tests;
@@ -0,0 +1,120 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
chain_head::test_utils::ChainHeadMockClient,
transaction::{
api::TransactionBroadcastApiServer,
tests::executor::{TaskExecutorBroadcast, TaskExecutorState},
TransactionBroadcast as RpcTransactionBroadcast,
},
};
use futures::Future;
use jsonrpsee::RpcModule;
use sc_transaction_pool::*;
use std::{pin::Pin, sync::Arc};
use substrate_test_runtime_client::{prelude::*, Client};
use substrate_test_runtime_transaction_pool::TestApi;
use crate::transaction::tests::middleware_pool::{MiddlewarePool, MiddlewarePoolRecv};
pub type Block = substrate_test_runtime_client::runtime::Block;
/// Initial Alice account nonce.
pub const ALICE_NONCE: u64 = 209;
fn create_basic_pool_with_genesis(
test_api: Arc<TestApi>,
options: Options,
) -> (BasicPool<TestApi, Block>, Pin<Box<dyn Future<Output = ()> + Send>>) {
let genesis_hash = {
test_api
.chain()
.read()
.block_by_number
.get(&0)
.map(|blocks| blocks[0].0.header.hash())
.expect("there is block 0. qed")
};
BasicPool::new_test(test_api, genesis_hash, genesis_hash, options)
}
fn maintained_pool(
options: Options,
) -> (BasicPool<TestApi, Block>, Arc<TestApi>, futures::executor::ThreadPool) {
let api = Arc::new(TestApi::with_alice_nonce(ALICE_NONCE));
let (pool, background_task) = create_basic_pool_with_genesis(api.clone(), options);
let thread_pool = futures::executor::ThreadPool::new().unwrap();
thread_pool.spawn_ok(background_task);
(pool, api, thread_pool)
}
pub fn setup_api(
options: Options,
) -> (
Arc<TestApi>,
Arc<MiddlewarePool>,
Arc<ChainHeadMockClient<Client<Backend>>>,
RpcModule<RpcTransactionBroadcast<MiddlewarePool, ChainHeadMockClient<Client<Backend>>>>,
TaskExecutorState,
MiddlewarePoolRecv,
) {
let (pool, api, _) = maintained_pool(options);
let (pool, pool_state) = MiddlewarePool::new(Arc::new(pool).clone());
let pool = Arc::new(pool);
let builder = TestClientBuilder::new();
let client = Arc::new(builder.build());
let client_mock = Arc::new(ChainHeadMockClient::new(client.clone()));
let (task_executor, executor_recv) = TaskExecutorBroadcast::new();
let tx_api =
RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor))
.into_rpc();
(api, pool, client_mock, tx_api, executor_recv, pool_state)
}
/// Get the next event from the provided middleware in at most 5 seconds.
macro_rules! get_next_event {
($middleware:expr) => {
tokio::time::timeout(std::time::Duration::from_secs(5), $middleware.recv())
.await
.unwrap()
.unwrap()
};
}
/// Collect the next number of transaction events from the provided middleware.
macro_rules! get_next_tx_events {
($middleware:expr, $num:expr) => {{
let mut events = std::collections::HashMap::new();
for _ in 0..$num {
let event = get_next_event!($middleware);
match event {
crate::transaction::tests::middleware_pool::MiddlewarePoolEvent::TransactionStatus { transaction, status } => {
events.entry(transaction).or_insert_with(|| vec![]).push(status);
},
other => panic!("Expected TransactionStatus, received {:?}", other),
};
}
events
}};
}
@@ -0,0 +1,523 @@
// This file is part of Substrate.
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{hex_string, transaction::error::json_rpc_spec};
use assert_matches::assert_matches;
use codec::Encode;
use jsonrpsee::{rpc_params, MethodsError as Error};
use sc_transaction_pool::{Options, PoolLimit};
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionPool};
use std::sync::Arc;
use substrate_test_runtime_client::AccountKeyring::*;
use substrate_test_runtime_transaction_pool::uxt;
// Test helpers.
use crate::transaction::tests::{
middleware_pool::{MiddlewarePoolEvent, TxStatusTypeTest},
setup::{setup_api, ALICE_NONCE},
};
#[tokio::test]
async fn tx_broadcast_enters_pool() {
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(Default::default());
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Ready
}
);
assert_eq!(1, pool.inner_pool.status().ready);
assert_eq!(uxt.encode().len(), pool.inner_pool.status().ready_bytes);
// Import block 2 with the transaction included.
let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();
// Announce block 2 to the pool.
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.inner_pool.maintain(event).await;
assert_eq!(0, pool.inner_pool.status().ready);
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::InBlock((block_2, 0))
}
);
// The future broadcast awaits for the finalized status to be reached.
// Force the future to exit by calling stop.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
// Ensure the broadcast future finishes.
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks());
}
#[tokio::test]
async fn tx_broadcast_invalid_tx() {
let (_, pool, _, tx_api, mut exec_middleware, _) = setup_api(Default::default());
// Invalid parameters.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_broadcast", [1u8])
.await
.unwrap_err();
assert_matches!(err,
Error::JsonRpc(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid params"
);
assert_eq!(0, pool.status().ready);
// Invalid transaction that cannot be decoded. The broadcast silently exits.
let xt = "0xdeadbeef";
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
assert_eq!(0, pool.status().ready);
// Await the broadcast future to exit.
// Without this we'd be subject to races, where we try to call the stop before the tx is
// dropped.
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks());
// The broadcast future was dropped, and the operation is no longer active.
// When the operation is not active, either from the tx being finalized or a
// terminal error; the stop method should return an error.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
Error::JsonRpc(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
#[tokio::test]
async fn tx_stop_with_invalid_operation_id() {
let (_, _, _, tx_api, _, _) = setup_api(Default::default());
// Make an invalid stop call.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", ["invalid_operation_id"])
.await
.unwrap_err();
assert_matches!(err,
Error::JsonRpc(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
#[tokio::test]
async fn tx_broadcast_resubmits_future_nonce_tx() {
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(Default::default());
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
let block_1 = block_1_header.hash();
let current_uxt = uxt(Alice, ALICE_NONCE);
let current_xt = hex_string(&current_uxt.encode());
// This lives in the future.
let future_uxt = uxt(Alice, ALICE_NONCE + 1);
let future_xt = hex_string(&future_uxt.encode());
let future_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&future_xt])
.await
.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: future_xt.clone(),
status: TxStatusTypeTest::Future
}
);
let event = ChainEvent::NewBestBlock { hash: block_1, tree_route: None };
pool.inner_pool.maintain(event).await;
assert_eq!(0, pool.inner_pool.status().ready);
// Ensure the tx is in the future.
assert_eq!(1, pool.inner_pool.status().future);
let block_2_header = api.push_block(2, vec![], true);
let block_2 = block_2_header.hash();
let operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&current_xt])
.await
.unwrap();
assert_ne!(future_operation_id, operation_id);
// Announce block 2 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_2_header).await;
// Collect the events of both transactions.
let events = get_next_tx_events!(&mut pool_middleware, 2);
// Transactions entered the ready queue.
assert_eq!(events.get(&current_xt).unwrap(), &vec![TxStatusTypeTest::Ready]);
assert_eq!(events.get(&future_xt).unwrap(), &vec![TxStatusTypeTest::Ready]);
let event = ChainEvent::NewBestBlock { hash: block_2, tree_route: None };
pool.inner_pool.maintain(event).await;
assert_eq!(2, pool.inner_pool.status().ready);
assert_eq!(0, pool.inner_pool.status().future);
// Finalize transactions.
let block_3_header = api.push_block(3, vec![current_uxt, future_uxt], true);
let block_3 = block_3_header.hash();
client_mock.trigger_import_stream(block_3_header).await;
let event = ChainEvent::Finalized { hash: block_3, tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;
assert_eq!(0, pool.inner_pool.status().ready);
assert_eq!(0, pool.inner_pool.status().future);
let events = get_next_tx_events!(&mut pool_middleware, 4);
assert_eq!(
events.get(&current_xt).unwrap(),
&vec![TxStatusTypeTest::InBlock((block_3, 0)), TxStatusTypeTest::Finalized((block_3, 0))]
);
assert_eq!(
events.get(&future_xt).unwrap(),
&vec![TxStatusTypeTest::InBlock((block_3, 1)), TxStatusTypeTest::Finalized((block_3, 1))]
);
// Both broadcast futures must exit.
let _ = get_next_event!(&mut exec_middleware.recv);
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks());
}
/// This test is similar to `tx_broadcast_enters_pool`
/// However the last block is announced as finalized to force the
/// broadcast future to exit before the `stop` is called.
#[tokio::test]
async fn tx_broadcast_stop_after_broadcast_finishes() {
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(Default::default());
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction
// pool.inner_pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Ready
}
);
assert_eq!(1, pool.inner_pool.status().ready);
assert_eq!(uxt.encode().len(), pool.inner_pool.status().ready_bytes);
// Import block 2 with the transaction included.
let block_2_header = api.push_block(2, vec![uxt.clone()], true);
let block_2 = block_2_header.hash();
// Announce block 2 to the pool.inner_pool.
let event = ChainEvent::Finalized { hash: block_2, tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;
assert_eq!(0, pool.inner_pool.status().ready);
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::InBlock((block_2, 0))
}
);
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Finalized((block_2, 0))
}
);
// Ensure the broadcast future terminated properly.
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks());
// The operation ID is no longer valid, check that the broadcast future
// cleared out the inner state of the operation.
let err = tx_api
.call::<_, serde_json::Value>("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap_err();
assert_matches!(err,
Error::JsonRpc(err) if err.code() == json_rpc_spec::INVALID_PARAM_ERROR && err.message() == "Invalid operation id"
);
}
#[tokio::test]
async fn tx_broadcast_resubmits_invalid_tx() {
let limits = PoolLimit { count: 8192, total_bytes: 20 * 1024 * 1024 };
let options = Options {
ready: limits.clone(),
future: limits,
reject_future_transactions: false,
// This ensures that a transaction is not banned.
ban_time: std::time::Duration::ZERO,
};
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(options);
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
let _operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
let block_1_header = api.push_block(1, vec![], true);
let block_1 = block_1_header.hash();
// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Ready,
}
);
assert_eq!(1, pool.inner_pool.status().ready);
assert_eq!(uxt.encode().len(), pool.inner_pool.status().ready_bytes);
// Mark the transaction as invalid from the API, causing a temporary ban.
api.add_invalid(&uxt);
// Push an event to the pool to ensure the transaction is excluded.
let event = ChainEvent::NewBestBlock { hash: block_1, tree_route: None };
pool.inner_pool.maintain(event).await;
assert_eq!(1, pool.inner_pool.status().ready);
// Ensure the `transaction_unstable_broadcast` is aware of the invalid transaction.
let event = get_next_event!(&mut pool_middleware);
// Because we have received an `Invalid` status, we try to broadcast the transaction with the
// next announced block.
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Invalid
}
);
// Import block 2.
let block_2_header = api.push_block(2, vec![], true);
client_mock.trigger_import_stream(block_2_header).await;
// Ensure we propagate the temporary ban error to `submit_and_watch`.
// This ensures we'll loop again with the next annmounced block and try to resubmit the
// transaction. The transaction remains temporarily banned until the pool is maintained.
let event = get_next_event!(&mut pool_middleware);
assert_matches!(event, MiddlewarePoolEvent::PoolError { transaction, err } if transaction == xt && err.contains("Transaction temporarily Banned"));
// Import block 3.
let block_3_header = api.push_block(3, vec![], true);
let block_3 = block_3_header.hash();
// Remove the invalid transaction from the pool to allow it to pass through.
api.remove_invalid(&uxt);
let event = ChainEvent::NewBestBlock { hash: block_3, tree_route: None };
// We have to maintain the pool to ensure the transaction is no longer invalid.
// This clears out the banned transactions.
pool.inner_pool.maintain(event).await;
assert_eq!(0, pool.inner_pool.status().ready);
// Announce block to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_3_header).await;
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Ready,
}
);
assert_eq!(1, pool.inner_pool.status().ready);
let block_4_header = api.push_block(4, vec![uxt], true);
let block_4 = block_4_header.hash();
let event = ChainEvent::Finalized { hash: block_4, tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::InBlock((block_4, 0)),
}
);
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Finalized((block_4, 0)),
}
);
// Ensure the broadcast future terminated properly.
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks());
}
/// This is similar to `tx_broadcast_resubmits_invalid_tx`.
/// However, it forces the tx to be resubmited because of the pool
/// limits. Which is a different code path than the invalid tx.
#[tokio::test]
async fn tx_broadcast_resubmits_dropped_tx() {
let limits = PoolLimit { count: 1, total_bytes: 1000 };
let options = Options {
ready: limits.clone(),
future: limits,
reject_future_transactions: false,
// This ensures that a transaction is not banned.
ban_time: std::time::Duration::ZERO,
};
let (api, pool, client_mock, tx_api, _, mut pool_middleware) = setup_api(options);
let current_uxt = uxt(Alice, ALICE_NONCE);
let current_xt = hex_string(&current_uxt.encode());
// This lives in the future.
let future_uxt = uxt(Alice, ALICE_NONCE + 1);
let future_xt = hex_string(&future_uxt.encode());
// By default the `validate_transaction` mock uses priority 1 for
// transactions. Bump the priority to ensure other transactions
// are immediately dropped.
api.set_priority(&current_uxt, 10);
let current_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&current_xt])
.await
.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
let block_1_header = api.push_block(1, vec![], true);
let event =
ChainEvent::Finalized { hash: block_1_header.hash(), tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;
client_mock.trigger_import_stream(block_1_header).await;
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: current_xt.clone(),
status: TxStatusTypeTest::Ready,
}
);
assert_eq!(1, pool.inner_pool.status().ready);
// The future tx has priority 2, smaller than the current 10.
api.set_priority(&future_uxt, 2);
let future_operation_id: String = tx_api
.call("transaction_unstable_broadcast", rpc_params![&future_xt])
.await
.unwrap();
assert_ne!(current_operation_id, future_operation_id);
let block_2_header = api.push_block(2, vec![], true);
let event =
ChainEvent::Finalized { hash: block_2_header.hash(), tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;
client_mock.trigger_import_stream(block_2_header).await;
// We must have at most 1 transaction in the pool, as per limits above.
assert_eq!(1, pool.inner_pool.status().ready);
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::PoolError {
transaction: future_xt.clone(),
err: "Transaction couldn't enter the pool because of the limit".into()
}
);
let block_3_header = api.push_block(3, vec![current_uxt], true);
let event =
ChainEvent::Finalized { hash: block_3_header.hash(), tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;
client_mock.trigger_import_stream(block_3_header.clone()).await;
// The first tx is in a finalzied block; the future tx must enter the pool.
let events = get_next_tx_events!(&mut pool_middleware, 3);
assert_eq!(
events.get(&current_xt).unwrap(),
&vec![
TxStatusTypeTest::InBlock((block_3_header.hash(), 0)),
TxStatusTypeTest::Finalized((block_3_header.hash(), 0))
]
);
// The dropped transaction was resubmitted.
assert_eq!(events.get(&future_xt).unwrap(), &vec![TxStatusTypeTest::Ready]);
}