snapshot before rebranding

This commit is contained in:
2025-12-14 07:37:21 +03:00
parent 5520d491a5
commit 09735eb97a
1752 changed files with 58116 additions and 15986 deletions
+59
View File
@@ -0,0 +1,59 @@
[package]
name = "bizinikiwi-txtesttool"
version = "0.7.0"
description = "Bizinikiwi utility: A library and CLI tool for sending transactions to Pezkuwi blockchain, enabling developers to test and monitor transaction scenarios."
documentation = "https://docs.rs/bizinikiwi-txtesttool"
license = "Apache-2.0 OR GPL-3.0"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
repository.workspace = true
[[bin]]
name = "txtt"
path = "bin/main.rs"
[dependencies]
async-trait = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
jsonrpsee = { workspace = true, features = [
"async-client",
"client-web-transport",
"jsonrpsee-types",
] }
average = "0.15.1"
chrono = { workspace = true }
clap = { workspace = true, features = ["derive"] }
clap_derive = "4.0.0-rc.1"
ctrlc = "3.4.4"
hex = { workspace = true, default-features = true }
codec = { workspace = true }
parking_lot = { workspace = true, default-features = true }
rand = { workspace = true, default-features = true }
serde = { workspace = true, default-features = true }
serde_json = { workspace = true, features = ["arbitrary_precision"] }
subxt = { workspace = true, default-features = true }
subxt-core = { workspace = true, default-features = true }
subxt-rpcs = { workspace = true, default-features = true }
subxt-signer = { workspace = true, features = ["unstable-eth"] }
termplot = "0.1.1"
thiserror = { workspace = true }
time = { version = "0.3.36", features = [
"formatting",
"local-offset",
"macros",
] }
tokio = { workspace = true, features = [
"macros",
"rt-multi-thread",
"sync",
"time",
] }
tokio-util = { workspace = true, features = ["compat"] }
tracing = { workspace = true, default-features = true }
tracing-subscriber = { workspace = true, features = [
"env-filter",
"fmt",
"time",
] }
+290
View File
@@ -0,0 +1,290 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
#![allow(dead_code)]
#![allow(unused_variables)]
use clap::Parser;
use codec::Compact;
use std::{fs, fs::File, io::BufReader, time::Duration};
use bizinikiwi_txtesttool::{
block_monitor::BlockMonitor,
cli::{Cli, CliCommand},
execution_log::{journal::Journal, make_stats, STAT_TARGET},
init_logger,
runner::DefaultTxTask,
scenario::{AccountsDescription, ChainType, ScenarioBuilder, ScenarioType},
subxt_transaction::{
self, generate_ecdsa_keypair, generate_sr25519_keypair, EthRuntimeConfig, EthTransaction,
EthTransactionsSink, SubstrateTransaction, SubstrateTransactionsSink, SENDER_SEED,
},
};
use subxt::{ext::frame_metadata::RuntimeMetadataPrefixed, PolkadotConfig};
use tracing::info;
macro_rules! populate_scenario_builder {
($scenario_builder:expr, $scenario_type:expr) => {{
match $scenario_type {
ScenarioType::OneShot { account, nonce } =>
$scenario_builder.with_account_id(account.clone()).with_nonce_from(*nonce),
ScenarioType::FromSingleAccount { account, from, count } => $scenario_builder
.with_account_id(account.clone())
.with_nonce_from(*from)
.with_txs_count(*count),
ScenarioType::FromManyAccounts { start_id, last_id, from, count } => $scenario_builder
.with_start_id(*start_id)
.with_last_id(*last_id)
.with_nonce_from(*from)
.with_txs_count(*count),
}
}};
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
init_logger();
let cli = Cli::parse();
match &cli.command {
CliCommand::Tx {
chain,
ws,
unwatched,
block_monitor,
mortal,
log_file,
scenario,
send_threshold,
remark,
tip,
use_legacy_backend,
} => match chain {
ChainType::Fake => {
unimplemented!()
},
ChainType::Eth => {
let mut scenario_builder = ScenarioBuilder::new()
.with_rpc_uri(ws.to_string())
.with_chain_type(chain.clone())
.with_send_threshold(*send_threshold as usize)
.with_block_monitoring(*block_monitor)
.with_watched_txs(!unwatched)
.with_installed_ctrlc_stop_hook(true)
.with_legacy_backend(*use_legacy_backend)
.with_tip(*tip);
scenario_builder = populate_scenario_builder!(scenario_builder, scenario);
if let Some(mortality) = mortal {
scenario_builder = scenario_builder.with_mortality(*mortality);
}
if let Some(inner) = remark {
scenario_builder = scenario_builder.with_remark_recipe(*inner);
}
let scenario_executor = scenario_builder.build().await;
let _ = scenario_executor.execute().await;
},
ChainType::Sub => {
let mut scenario_builder = ScenarioBuilder::new()
.with_rpc_uri(ws.to_string())
.with_chain_type(chain.clone())
.with_send_threshold(*send_threshold as usize)
.with_block_monitoring(*block_monitor)
.with_watched_txs(!unwatched)
.with_installed_ctrlc_stop_hook(true)
.with_legacy_backend(*use_legacy_backend)
.with_tip(*tip);
scenario_builder = populate_scenario_builder!(scenario_builder, scenario);
if let Some(mortality) = mortal {
scenario_builder = scenario_builder.with_mortality(*mortality);
}
if let Some(inner) = remark {
scenario_builder = scenario_builder.with_remark_recipe(*inner);
}
let scenario_executor = scenario_builder.build().await;
let _ = scenario_executor.execute().await;
},
},
CliCommand::CheckNonce { chain, ws, account } => {
match chain {
ChainType::Fake => {
panic!("check nonce not supported for fake chain");
},
ChainType::Eth => {
let desc = if let Ok(id) = account.parse::<u32>() {
AccountsDescription::Derived(id..id + 1)
} else {
AccountsDescription::Keyring(account.clone())
};
let sink = EthTransactionsSink::new_with_uri_with_accounts_description(
ws,
desc,
generate_ecdsa_keypair,
None,
false,
)
.await;
let account =
sink.get_from_account_id(account).ok_or("account shall be correct")?;
let nonce = sink.check_account_nonce(account).await?;
info!(target:STAT_TARGET, "{nonce:?}");
},
ChainType::Sub => {
let desc = if let Ok(id) = account.parse::<u32>() {
AccountsDescription::Derived(id..id + 1)
} else {
AccountsDescription::Keyring(account.clone())
};
let sink = SubstrateTransactionsSink::new_with_uri_with_accounts_description(
ws,
desc,
generate_sr25519_keypair,
None,
false,
)
.await;
let account =
sink.get_from_account_id(account).ok_or("account shall be correct")?;
let nonce = sink.check_account_nonce(account).await?;
info!(target:STAT_TARGET, "{nonce:?}");
},
};
},
CliCommand::Metadata { ws } => {
// Handle metadata command
use codec::Decode;
let api = subxt::OnlineClient::<EthRuntimeConfig>::from_insecure_url(ws).await?;
let runtime_apis = api.runtime_api().at_latest().await?;
let raw_bytes: Vec<u8> = runtime_apis.call_raw("Metadata_metadata", None).await?;
let (_, meta): (Compact<u32>, RuntimeMetadataPrefixed) =
Decode::decode(&mut &raw_bytes[..]).map_err(|e| format!("Decode error: {e}"))?;
println!("{meta:#?}");
},
CliCommand::BlockMonitor { chain, ws, display } => {
match chain {
ChainType::Sub => {
let block_monitor =
BlockMonitor::<PolkadotConfig>::new_with_options(ws, *display).await;
async {
loop {
tokio::time::sleep(Duration::from_secs(10)).await
}
}
.await;
},
ChainType::Eth => {
let block_monitor =
BlockMonitor::<EthRuntimeConfig>::new_with_options(ws, *display).await;
async {
loop {
tokio::time::sleep(Duration::from_secs(10)).await
}
}
.await;
},
ChainType::Fake => {
unimplemented!()
},
};
},
CliCommand::LoadLog { chain, log_file, show_graphs, out_csv_filename, .. } => match chain {
ChainType::Sub => {
let logs = Journal::<DefaultTxTask<SubstrateTransaction>>::load_logs(
log_file,
out_csv_filename,
);
make_stats(logs.values().cloned(), *show_graphs);
},
ChainType::Eth => {
let logs =
Journal::<DefaultTxTask<EthTransaction>>::load_logs(log_file, out_csv_filename);
make_stats(logs.values().cloned(), *show_graphs);
},
ChainType::Fake => {
unimplemented!()
},
},
CliCommand::GenerateEndowedAccounts {
chain,
start_id,
last_id,
balance,
out_file_name,
chain_spec,
} => {
let accounts_description = AccountsDescription::Derived(*start_id..last_id + 1);
let funded_accounts = match chain {
ChainType::Sub => {
let accounts = subxt_transaction::derive_accounts::<PolkadotConfig, _, _>(
accounts_description.clone(),
SENDER_SEED,
generate_sr25519_keypair,
);
accounts
.values()
.map(|keypair| {
serde_json::json!((
<PolkadotConfig as subxt::Config>::AccountId::from(
keypair.0.clone().public_key()
),
balance,
))
})
.collect::<Vec<_>>()
},
ChainType::Eth => {
let accounts = subxt_transaction::derive_accounts::<EthRuntimeConfig, _, _>(
accounts_description.clone(),
SENDER_SEED,
generate_ecdsa_keypair,
);
accounts
.values()
.map(|keypair| {
serde_json::json!((
"0x".to_string() + &hex::encode(keypair.0.clone().public_key()),
balance,
))
})
.collect::<Vec<_>>()
},
ChainType::Fake => Default::default(),
};
if let Some(chain_spec) = chain_spec {
let file = File::open(chain_spec)?;
let reader = BufReader::new(file);
let mut chain_spec: serde_json::Value = serde_json::from_reader(reader)?;
if let Some(balances) = chain_spec["genesis"]["runtimeGenesis"]["patch"]["balances"]
["balances"]
.as_array_mut()
{
balances.extend(funded_accounts);
} else {
return Err("Balances array not found in provided chain-spec".into());
}
fs::write(
out_file_name,
serde_json::to_string_pretty(&chain_spec)
.map_err(|e| format!("to pretty failed: {e}"))?,
)
.map_err(|err| err.to_string())?;
} else {
let json_object = serde_json::json!({"balances":{"balances":funded_accounts}});
fs::write(out_file_name, serde_json::to_string_pretty(&json_object)?.as_bytes())?;
}
},
};
Ok(())
}
@@ -0,0 +1,190 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use std::{
collections::{HashMap, HashSet},
pin::Pin,
};
use crate::{error::Error, transaction::TransactionMonitor};
use async_trait::async_trait;
use clap::ValueEnum;
use futures::Future;
use subxt::{blocks::Block, OnlineClient};
use subxt_core::config::Header;
use tokio::{
select,
sync::{mpsc, oneshot},
};
use tracing::{info, trace};
/// Monitors if transactions are part of finalized blocks and notifies external listeners when
/// found.
pub type BlockMonitorTask = Pin<Box<dyn Future<Output = ()> + Send>>;
/// Receiving end of a channel used by the runner to get notified with the block hash of the
/// finalized block that contains unwatched transactions.
type TxFoundListener<H> = oneshot::Receiver<Result<H, Error>>;
/// Sending end of a channel used by the block monitor to send block hash notifications to a runner
/// waiting for unwatched transaction finalization.
type TxFoundListenerTrigger<H> = oneshot::Sender<Result<H, Error>>;
/// Information used by block monitor to create a listener for unwatched transactions finalization.
/// It is based on a tuple containing (transaction hash, maybe_mortality, channel sending end).
type ListenerInfo<C> = (HashOf<C>, Option<u64>, TxFoundListenerTrigger<HashOf<C>>);
/// Receiving end of a channel used by block monitor to register unwatched transactions
/// finalization listener.
type TxSubmissionListener<C> = mpsc::Receiver<ListenerInfo<C>>;
/// Sending end of a channel used by the runner to submit to the block monitor a listener for
/// unwatched transactions finalization.
type TxSubmissionSender<C> = mpsc::Sender<ListenerInfo<C>>;
type HashOf<C> = <<C as subxt::Config>::Hasher as subxt::config::Hasher>::Output;
#[derive(ValueEnum, Copy, Clone, Debug)]
pub enum BlockMonitorDisplayOptions {
Best,
Finalized,
All,
}
impl BlockMonitorDisplayOptions {
fn display_best(&self) -> bool {
matches!(self, Self::Best | Self::All)
}
fn display_finalized(&self) -> bool {
matches!(self, Self::Finalized | Self::All)
}
}
#[derive(Clone)]
pub struct BlockMonitor<C: subxt::Config> {
listener_request_tx: TxSubmissionSender<C>,
}
#[async_trait]
impl<C: subxt::Config> TransactionMonitor<HashOf<C>> for BlockMonitor<C> {
async fn wait(&self, tx_hash: HashOf<C>, until: Option<u64>) -> Result<HashOf<C>, Error> {
let listener = self.register_listener(tx_hash, until).await;
listener.await.map_err(|err| {
Error::Other(format!("failed while waiting for tx finalization: {err}"))
})?
}
}
impl<C: subxt::Config> BlockMonitor<C> {
/// Instantiates a [`BlockMonitor`].
pub async fn new(uri: &str) -> Self {
trace!(uri, "BlockNumber::new");
let api = OnlineClient::<C>::from_insecure_url(uri)
.await
.expect("should connect to rpc client");
let (listener_request_tx, rx) = mpsc::channel(100);
tokio::spawn(async { Self::run(api, rx, BlockMonitorDisplayOptions::All).await });
Self { listener_request_tx }
}
pub async fn new_with_options(uri: &str, options: BlockMonitorDisplayOptions) -> Self {
trace!(uri, "BlockNumber::new");
let api = OnlineClient::<C>::from_insecure_url(uri)
.await
.expect("should connect to rpc client");
let (listener_request_tx, rx) = mpsc::channel(100);
tokio::spawn(async move { Self::run(api, rx, options).await });
Self { listener_request_tx }
}
/// Returns the receiving end of a channel where a notification is sent if the transaction with
/// the given hash is found in a finalized block.
pub async fn register_listener(
&self,
h: HashOf<C>,
valid_until: Option<u64>,
) -> TxFoundListener<HashOf<C>> {
trace!(hash = ?h, "register_listener");
let (tx, external_listener) = oneshot::channel();
self.listener_request_tx.send((h, valid_until, tx)).await.unwrap();
external_listener
}
async fn handle_block(
callbacks: &mut HashMap<HashOf<C>, TxFoundListenerTrigger<HashOf<C>>>,
mortal_accounting: &mut HashMap<u64, HashSet<HashOf<C>>>,
block: Block<C, OnlineClient<C>>,
options: BlockMonitorDisplayOptions,
finalized: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let block_number: u64 = block.header().number().into();
let block_hash = block.hash();
let extrinsics = block.extrinsics().await?;
let extrinsics_count = extrinsics.len();
if finalized {
for ext in extrinsics.iter() {
let hash = ext.hash();
if let Some(trigger) = callbacks.remove(&hash) {
trace!(?hash, "found transaction, notifying");
trigger.send(Ok(block_hash)).unwrap();
}
}
mortal_accounting.remove(&block_number).into_iter().for_each(|txs| {
for tx in txs {
if let Some(trigger) = callbacks.remove(&tx) {
info!(hash = ?tx, block_number, "mortal transaction lifetime ends");
trigger.send(Err(Error::MortalLifetimeSurpassed(block_number))).unwrap();
}
}
});
if options.display_finalized() {
info!(block_number, extrinsics_count, "FINALIZED block");
}
} else if options.display_best() {
info!(block_number, extrinsics_count, " BEST block");
}
Ok(())
}
async fn block_monitor_inner(
api: OnlineClient<C>,
mut listener_request_rx: TxSubmissionListener<C>,
options: BlockMonitorDisplayOptions,
) -> Result<(), Box<dyn std::error::Error>> {
let mut finalized_blocks_sub = api.blocks().subscribe_finalized().await?;
let mut best_blocks_sub = api.blocks().subscribe_best().await?;
let mut callbacks = HashMap::<HashOf<C>, TxFoundListenerTrigger<HashOf<C>>>::new();
let mut mortal_accounting = HashMap::<u64, HashSet<HashOf<C>>>::new();
loop {
select! {
Some(Ok(block)) = finalized_blocks_sub.next() => {
Self::handle_block(&mut callbacks, &mut mortal_accounting, block, options, true).await?;
}
Some(Ok(block)) = best_blocks_sub.next() => {
Self::handle_block(&mut callbacks, &mut mortal_accounting, block, options, false).await?;
}
Some((hash, valid_until, tx)) = listener_request_rx.recv() => {
trace!("listener_request: {:?}", hash);
callbacks.insert(hash, tx);
if let Some(till) = valid_until {
mortal_accounting
.entry(till)
.or_default()
.insert(hash);
}
}
}
}
}
async fn run(
api: OnlineClient<C>,
listener_requrest_rx: TxSubmissionListener<C>,
options: BlockMonitorDisplayOptions,
) {
let _ = Self::block_monitor_inner(api, listener_requrest_rx, options).await;
}
}
+129
View File
@@ -0,0 +1,129 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
block_monitor::BlockMonitorDisplayOptions,
scenario::{ChainType, ScenarioType},
};
use clap::{Parser, Subcommand};
#[derive(Parser)]
#[clap(name = "txtt")]
pub struct Cli {
#[clap(subcommand)]
pub command: CliCommand,
}
#[derive(Subcommand)]
pub enum CliCommand {
Tx {
/// The type of chain to be used.
#[clap(long, default_value = "sub")]
chain: ChainType,
/// The RPC endpoint of the node to be used.
#[clap(long, default_value = "ws://127.0.0.1:9933")]
ws: String,
/// Send transaction with event listener (submit_and_watch).
#[clap(long)]
unwatched: bool,
/// Spawn block monitor for checking if transactions are included in finalized blocks.
#[clap(long)]
block_monitor: bool,
/// Use mortal transactions. This represents the number of blocks the mortal tx is valid
/// for, starting with the current finalized block at the time of tx creation.
#[clap(long)]
mortal: Option<u64>,
/// Send transactions threshold, sends the batch when number of pedning extrinsics drops
/// below this number.
#[clap(long, default_value_t = 10000)]
send_threshold: u32,
/// Override log file name (out_yyyymmdd_hhmmss.json)
#[clap(long)]
log_file: Option<String>,
/// Use remark command with given size in kbytes. If not given transfer transaction will be
/// sent.
#[clap(long)]
remark: Option<u32>,
/// Transaction tip (allows to control prio)
#[clap(long, default_value_t = 0)]
tip: u128,
/// Accounts range used for building/seding transactions.
#[clap(subcommand)]
scenario: ScenarioType,
/// Use legacy backend
#[clap(long, default_value_t = false)]
use_legacy_backend: bool,
},
/// Check nonce for given account.
CheckNonce {
/// The type of chain to be used.
#[clap(long, default_value = "sub")]
chain: ChainType,
/// The RPC endpoint of the node to be used.
#[clap(long, default_value = "ws://127.0.0.1:9933")]
ws: String,
/// Account identifier to be used. It can be keyring account (alice, bob,...) or index of
/// pre-funded account index used for derivation.
#[clap(long)]
account: String,
},
/// Download and display the metadata.
Metadata {
/// The RPC endpoint of the node to be used.
#[clap(long, default_value = "ws://127.0.0.1:9933")]
ws: String,
},
/// Execute the stand alone block monitor and print some transactions stats.
BlockMonitor {
/// The type of chain to be used.
#[clap(long, default_value = "sub")]
chain: ChainType,
/// The RPC endpoint of the node to be used.
#[clap(long, default_value = "ws://127.0.0.1:9933")]
ws: String,
#[clap(long, default_value = "all")]
display: BlockMonitorDisplayOptions,
},
/// Load and inspect existing log file.
LoadLog {
/// The type of chain used to store the log file..
#[clap(long, default_value = "sub")]
chain: ChainType,
/// Name of the file to be loaded.
log_file: String,
#[clap(long)]
/// Display some histograms.
show_graphs: bool,
/// Display errors.
#[clap(long)]
show_errors: bool,
/// Convert loaded log file into CSV file. If specified, CSV file will be written into
/// given location.
#[clap(long)]
out_csv_filename: Option<String>,
},
/// Generate a list of endowed accounts.
GenerateEndowedAccounts {
/// The type of chain used to store the log file..
#[clap(long, default_value = "sub")]
chain: ChainType,
/// First account identifier to be generated (index of the account used for a
/// derivation).
#[clap(long)]
start_id: u32,
/// Last account identifier to be generated.
#[clap(long)]
last_id: u32,
/// Initial balance
#[clap(long, default_value = "100000000000000")]
balance: u128,
/// File where patch with funded accounts json will be stored. Or if the input chain-spec
/// was provided, the location of chain spec with endowed accounts injected.
#[clap(long)]
out_file_name: String,
/// The plain chain spec file that will be used to inject endowed accounts into.
#[clap(long)]
chain_spec: Option<String>,
},
}
+19
View File
@@ -0,0 +1,19 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
pub enum Error {
/// Subxt error.
#[serde(skip)]
#[error("subxt error: {0}")]
Subxt(#[from] subxt::Error),
/// Other error.
#[error("Other error: {0}")]
Other(String),
/// Mortal transaction lifetime surpassed
#[error("Mortal transaction lifetime surpassed, block number: {0}")]
MortalLifetimeSurpassed(u64),
}
@@ -0,0 +1,788 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
error::Error,
runner::{TxTask, TxTaskHash},
transaction::{AccountMetadata, Transaction, TransactionStatus},
};
use average::{Estimate, Max, Mean, Min, Quantile};
use parking_lot::RwLock;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
collections::HashMap,
fmt::Display,
fs::File,
io::{Read, Write},
marker::PhantomData,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, SystemTime},
};
use subxt_core::config::Hash as BlockHash;
use tracing::{debug, info, trace};
pub const STAT_TARGET: &str = "stat";
pub const LOG_TARGET: &str = "execution_log";
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ExecutionEvent<H> {
Popped(SystemTime),
Sent(SystemTime),
Resubmitted(SystemTime),
SubmitResult(SystemTime, Result<(), String>),
SubmitAndWatchResult(SystemTime, Result<(), String>),
TxPoolEvent(SystemTime, TransactionStatus<H>),
FinalizedMonitor(SystemTime, H),
MortalDroppedMonitor(SystemTime, u64),
}
impl<H: BlockHash + DeserializeOwned + std::fmt::Debug> ExecutionEvent<H> {}
impl<H: BlockHash> ExecutionEvent<H> {
pub fn popped() -> Self {
Self::Popped(SystemTime::now())
}
pub fn sent() -> Self {
Self::Sent(SystemTime::now())
}
pub fn submit_and_watch_result(r: Result<(), Error>) -> Self {
Self::SubmitAndWatchResult(SystemTime::now(), r.map_err(|e| e.to_string()))
}
pub fn submit_result(r: Result<(), Error>) -> Self {
Self::SubmitResult(SystemTime::now(), r.map_err(|e| e.to_string()))
}
pub fn finalized_monitor(block_hash: H) -> Self {
Self::FinalizedMonitor(SystemTime::now(), block_hash)
}
pub fn mortal_dropped_monitor(block_number: u64) -> Self {
Self::MortalDroppedMonitor(SystemTime::now(), block_number)
}
}
impl<H: BlockHash> From<TransactionStatus<H>> for ExecutionEvent<H> {
fn from(value: TransactionStatus<H>) -> Self {
Self::TxPoolEvent(SystemTime::now(), value)
}
}
#[derive(Debug, Default)]
pub struct Counters {
popped: AtomicUsize,
sent: AtomicUsize,
submit_success: AtomicUsize,
submit_error: AtomicUsize,
submit_and_watch_success: AtomicUsize,
submit_and_watch_error: AtomicUsize,
finalized_monitor: AtomicUsize,
mortal_dropped_monitor: AtomicUsize,
ts_validated: AtomicUsize,
ts_broadcasted: AtomicUsize,
ts_in_block: AtomicUsize,
ts_finalized: AtomicUsize,
ts_dropped: AtomicUsize,
ts_invalid: AtomicUsize,
ts_error: AtomicUsize,
}
impl Display for Counters {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// let buffered = self.buffered();
write!(
f,
"p {:7} s:{:7} {:7}/{:7} v:{:7} b{:7} B:{:7} f:{:7} d:{:7} i:{:7}",
self.popped.load(Ordering::Relaxed),
self.sent.load(Ordering::Relaxed),
self.submit_and_watch_success.load(Ordering::Relaxed),
self.submit_and_watch_error.load(Ordering::Relaxed),
self.ts_validated.load(Ordering::Relaxed),
self.ts_broadcasted.load(Ordering::Relaxed),
self.ts_in_block.load(Ordering::Relaxed),
self.ts_finalized.load(Ordering::Relaxed),
self.ts_dropped.load(Ordering::Relaxed),
self.ts_invalid.load(Ordering::Relaxed),
)
}
}
impl Counters {
fn inc(x: &AtomicUsize) {
x.fetch_add(1, Ordering::Relaxed);
}
pub fn buffered(&self) -> usize {
self.popped.load(Ordering::Relaxed) -
(self.submit_and_watch_success.load(Ordering::Relaxed) +
self.submit_and_watch_error.load(Ordering::Relaxed)) -
(self.submit_success.load(Ordering::Relaxed) +
self.submit_error.load(Ordering::Relaxed))
}
fn count_event<H: BlockHash>(&self, event: &ExecutionEvent<H>) {
match event {
ExecutionEvent::Popped(_) => Self::inc(&self.popped),
ExecutionEvent::Sent(_) => Self::inc(&self.sent),
ExecutionEvent::SubmitResult(_, Ok(_)) => Self::inc(&self.submit_success),
ExecutionEvent::SubmitResult(_, Err(_)) => Self::inc(&self.submit_error),
ExecutionEvent::SubmitAndWatchResult(_, Ok(_)) =>
Self::inc(&self.submit_and_watch_success),
ExecutionEvent::SubmitAndWatchResult(_, Err(_)) =>
Self::inc(&self.submit_and_watch_error),
ExecutionEvent::FinalizedMonitor(_, _) => Self::inc(&self.finalized_monitor),
ExecutionEvent::TxPoolEvent(_, status) => match status {
TransactionStatus::Validated => Self::inc(&self.ts_validated),
TransactionStatus::Broadcasted => Self::inc(&self.ts_broadcasted),
TransactionStatus::Finalized(_) => Self::inc(&self.ts_finalized),
TransactionStatus::Dropped(_) => Self::inc(&self.ts_dropped),
TransactionStatus::Invalid(_) => Self::inc(&self.ts_invalid),
TransactionStatus::Error(_) => Self::inc(&self.ts_error),
TransactionStatus::InBlock(_) => Self::inc(&self.ts_in_block),
TransactionStatus::NoLongerInBestBlock => {},
},
ExecutionEvent::Resubmitted(_) => {},
ExecutionEvent::MortalDroppedMonitor(_, _) => Self::inc(&self.mortal_dropped_monitor),
}
}
}
/// Type alias for a dictionary of execution logs.
pub type Logs<T> = HashMap<TxTaskHash<T>, Arc<TransactionExecutionLog<TxTaskHash<T>>>>;
/// Trait for accessing transaction log recording.
pub trait ExecutionLog: Sync + Send {
type HashType: BlockHash;
/// Records an execution event associated with the transaction.
fn push_event(&self, event: ExecutionEvent<Self::HashType>);
/// Returns the hash of the transaction.
fn hash(&self) -> Self::HashType;
/// Returns the nonce of the transaction.
fn nonce(&self) -> u128;
/// Retrieves account metadata associated with the transaction.
fn account_metadata(&self) -> AccountMetadata;
/// Returns a list of block hashes where the transaction was included.
fn in_blocks(&self) -> Vec<Self::HashType>;
/// Returns the hash of the finalized block, if available.
fn finalized(&self) -> Option<Self::HashType>;
/// Determines if the transaction's progress is being monitored. If not, some events are not
/// available.
fn is_watched(&self) -> bool;
/// Returns the duration from submission to result reception, if applicable.
fn time_to_result(&self) -> Option<Duration>;
/// Returns the duration from submission to validation event.
fn time_to_validated(&self) -> Option<Duration>;
/// Returns the duration from submission to broadcasted event.
fn time_to_broadcasted(&self) -> Option<Duration>;
/// Returns the duration from submission to finalization.
fn time_to_finalized(&self) -> Option<Duration>;
/// Returns the duration from submission to being included in a block.
fn time_to_inblock(&self) -> Option<Duration>;
/// Returns a list of durations for each inclusion in a block.
fn times_to_inblock(&self) -> Option<Vec<Duration>>;
/// Returns the duration from submission to drop event.
fn time_to_dropped(&self) -> Option<Duration>;
/// Returns the duration from submission to invalidation.
fn time_to_invalid(&self) -> Option<Duration>;
/// Returns the duration from submission to error occurrence.
fn time_to_error(&self) -> Option<Duration>;
/// Returns the duration to finalization as monitored by an external observer.
fn time_to_finalized_monitor(&self) -> Option<Duration>;
/// Returns the duration to mortal dropped as monitored by an external observer.
///
/// The correct moment a transaction is blocked is not known by simply looking at the finalize
/// blocks, but when the tx is dropped, this represents the time it took to declare it as
/// dropped/invalid.
fn time_to_mortal_dropped_monitor(&self) -> Option<Duration>;
/// Retrieves reasons for invalidation of the transaction.
fn get_invalid_reason(&self) -> Vec<String>;
/// Retrieves error reasons encountered during transaction execution.
fn get_error_reason(&self) -> Vec<String>;
/// Retrieves reasons for dropping the transaction.
fn get_dropped_reason(&self) -> Vec<String>;
/// Returns the number of times the transaction has been resent.
fn get_resent_count(&self) -> u32;
/// Retrieves errors from the submit result, if any.
fn get_submit_result_error(&self) -> Vec<String>;
/// Retrieves errors from the submit and watch result, if any. Works for watched transaction
/// only.
fn get_submit_and_watch_result_error(&self) -> Vec<String>;
/// Returns a string representation of in-pool events.
///
/// Examples:
/// `VbBBI` - Validated, Broadcasted, InBlock, InBlock, Invalid
/// `VbBF` - Validated, Broadcasted, InBlock, Finalized
fn get_inpool_events_string(&self) -> String;
/// Returns the system time when the transaction was sent, if available.
fn get_sent_time(&self) -> Option<SystemTime>;
}
#[derive(Debug)]
/// Represents transaction execution log, provides all events assocaited with given transaction.
pub struct TransactionExecutionLog<H: BlockHash> {
/// All events recorded for the transaction.
events: RwLock<Vec<ExecutionEvent<H>>>,
/// Contains all account metadata.
account_metadata: AccountMetadata,
/// Nonce of the transaction.
nonce: u128,
/// Hash of the transaction.
hash: H,
/// Shared instance of global events counter.
total_counters: Arc<Counters>,
}
impl<H: BlockHash + Default> Default for TransactionExecutionLog<H> {
fn default() -> Self {
Self {
events: Default::default(),
nonce: Default::default(),
account_metadata: Default::default(),
hash: Default::default(),
total_counters: Default::default(),
}
}
}
impl<H: BlockHash + 'static + Default> TransactionExecutionLog<H> {
pub fn new_with_events(events: Vec<ExecutionEvent<H>>) -> Self {
Self { events: events.into(), ..Default::default() }
}
}
impl<H: BlockHash + 'static> TransactionExecutionLog<H> {
pub fn new_with_tx(t: &dyn Transaction<HashType = H>, counters: Arc<Counters>) -> Self {
Self {
events: Default::default(),
nonce: t.nonce(),
account_metadata: t.account_metadata(),
hash: t.hash(),
total_counters: counters,
}
}
fn get_sent_time_stamp(&self) -> Option<SystemTime> {
self.events.read().iter().find_map(|e| match e {
ExecutionEvent::Sent(i) => Some(*i),
_ => None,
})
}
fn duration_since_timestamp(
start: Option<SystemTime>,
end: Option<SystemTime>,
) -> Option<Duration> {
if let (Some(start), Some(end)) = (start, end) {
Some(end.duration_since(start).expect("time goes forward."))
} else {
None
}
}
}
impl<H: BlockHash + 'static> ExecutionLog for TransactionExecutionLog<H> {
type HashType = H;
fn push_event(&self, event: ExecutionEvent<Self::HashType>) {
debug!(target:LOG_TARGET, ?event, "B push_event");
if match event {
// note: dedup in block events - on the stats line we want to see transactions included,
// not events count
ExecutionEvent::TxPoolEvent(_, TransactionStatus::InBlock(_)) =>
self.in_blocks().is_empty(),
_ => true,
} {
self.total_counters.count_event(&event);
}
self.events.write().push(event);
trace!(target:LOG_TARGET, "A push_event");
}
// all methods used for generating stats:
fn hash(&self) -> Self::HashType {
self.hash
}
fn nonce(&self) -> u128 {
self.nonce
}
fn account_metadata(&self) -> AccountMetadata {
self.account_metadata.clone()
}
fn is_watched(&self) -> bool {
unimplemented!()
}
fn in_blocks(&self) -> Vec<Self::HashType> {
self.events
.read()
.iter()
.filter_map(|e| match e {
ExecutionEvent::TxPoolEvent(_, TransactionStatus::InBlock(h)) => Some(*h),
_ => None,
})
.collect()
}
fn finalized(&self) -> Option<Self::HashType> {
self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(_, TransactionStatus::Finalized(h)) => Some(*h),
_ => None,
})
}
fn time_to_finalized_monitor(&self) -> Option<Duration> {
let fmts = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::FinalizedMonitor(i, _) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), fmts)
}
fn time_to_mortal_dropped_monitor(&self) -> Option<Duration> {
let fmts = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::MortalDroppedMonitor(i, _) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), fmts)
}
fn time_to_finalized(&self) -> Option<Duration> {
let fts = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(i, TransactionStatus::Finalized(_)) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), fts)
}
fn time_to_validated(&self) -> Option<Duration> {
let vts = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(i, TransactionStatus::Validated) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), vts)
}
fn time_to_broadcasted(&self) -> Option<Duration> {
let bts = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(i, TransactionStatus::Broadcasted) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), bts)
}
fn time_to_inblock(&self) -> Option<Duration> {
let bts = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(i, TransactionStatus::InBlock(_)) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), bts)
}
fn times_to_inblock(&self) -> Option<Vec<Duration>> {
unimplemented!()
}
fn time_to_dropped(&self) -> Option<Duration> {
let dts = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(i, TransactionStatus::Dropped(_)) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), dts)
}
fn time_to_invalid(&self) -> Option<Duration> {
let its = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(i, TransactionStatus::Invalid(_)) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), its)
}
fn time_to_error(&self) -> Option<Duration> {
let ets = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::TxPoolEvent(i, TransactionStatus::Error(_)) => Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), ets)
}
fn time_to_result(&self) -> Option<Duration> {
let ets = self.events.read().iter().find_map(|e| match e {
ExecutionEvent::SubmitAndWatchResult(i, _) | ExecutionEvent::SubmitResult(i, _) =>
Some(*i),
_ => None,
});
Self::duration_since_timestamp(self.get_sent_time_stamp(), ets)
}
fn get_invalid_reason(&self) -> Vec<String> {
self.events
.read()
.iter()
.filter_map(|e| match e {
ExecutionEvent::TxPoolEvent(_, TransactionStatus::Invalid(reason)) =>
Some(reason.clone()),
_ => None,
})
.collect()
}
fn get_error_reason(&self) -> Vec<String> {
self.events
.read()
.iter()
.filter_map(|e| match e {
ExecutionEvent::TxPoolEvent(_, TransactionStatus::Error(reason)) =>
Some(reason.clone()),
_ => None,
})
.collect()
}
fn get_dropped_reason(&self) -> Vec<String> {
self.events
.read()
.iter()
.filter_map(|e| match e {
ExecutionEvent::TxPoolEvent(_, TransactionStatus::Dropped(reason)) =>
Some(reason.clone()),
_ => None,
})
.collect()
}
fn get_resent_count(&self) -> u32 {
unimplemented!()
}
fn get_submit_result_error(&self) -> Vec<String> {
self.events
.read()
.iter()
.filter_map(|e| match e {
ExecutionEvent::SubmitResult(_, Err(reason)) => Some(reason.clone()),
_ => None,
})
.collect()
}
fn get_submit_and_watch_result_error(&self) -> Vec<String> {
self.events
.read()
.iter()
.filter_map(|e| match e {
ExecutionEvent::SubmitAndWatchResult(_, Err(reason)) => Some(reason.clone()),
_ => None,
})
.collect()
}
fn get_inpool_events_string(&self) -> String {
self.events
.read()
.iter()
.filter_map(|e| match e {
ExecutionEvent::TxPoolEvent(_, p) => Some(p.get_letter()),
_ => None,
})
.collect()
}
fn get_sent_time(&self) -> Option<SystemTime> {
self.get_sent_time_stamp()
}
}
pub fn single_stat<'a, E: ExecutionLog + 'a>(
name: String,
logs: impl Iterator<Item = &'a Arc<E>>,
method: fn(&E) -> Option<Duration>,
show_graph: bool,
) {
let mut v: Vec<f64> = vec![];
for l in logs {
let time_to_event = method(&**l);
if let Some(time_to_event) = time_to_event {
v.push(time_to_event.as_secs_f64());
}
}
let mean: Mean = v.iter().collect();
let max: Max = v.iter().collect();
let min: Min = v.iter().collect();
let mut third_quartile = Quantile::new(0.9);
v.iter().for_each(|x| third_quartile.add(*x));
info!(
target: STAT_TARGET,
count = mean.len(),
min = min.min(),
max = max.max(),
mean = mean.mean(),
q90 = third_quartile.quantile(),
"{name}"
);
if show_graph {
use termplot::*;
let mut plot = Plot::default();
plot.set_domain(Domain(min.min()..max.max()))
.set_codomain(Domain(0.0..mean.len() as f64))
.set_title(&name)
.set_x_label("X axis")
.set_y_label("Y axis")
.set_size(Size::new(80, 45))
.add_plot(Box::new(plot::Histogram::new_with_buckets_count(v, 20)));
println!("{plot}");
}
}
pub fn failure_reason_stats<'a, E: ExecutionLog + 'a>(
name: String,
logs: impl Iterator<Item = &'a Arc<E>>,
method: fn(&E) -> Vec<String>,
) {
let mut map = HashMap::<String, usize>::new();
for l in logs {
for reason in method(&**l) {
*map.entry(reason).or_default() += 1;
}
}
info!(
target: STAT_TARGET,
?map, "{name} -> {:#?}", map);
}
pub fn make_stats<E: ExecutionLog>(logs: impl IntoIterator<Item = Arc<E>>, show_graphs: bool) {
let logs = logs.into_iter().collect::<Vec<_>>();
info!(target: STAT_TARGET, total_recorded_count = logs.len());
single_stat("Time to dropped".into(), logs.iter(), E::time_to_dropped, show_graphs);
single_stat("Time to error".into(), logs.iter(), E::time_to_error, show_graphs);
single_stat("Time to invalid".into(), logs.iter(), E::time_to_invalid, show_graphs);
single_stat("Time to result".into(), logs.iter(), E::time_to_result, show_graphs);
single_stat("Time to validated".into(), logs.iter(), E::time_to_validated, show_graphs);
single_stat("Time to broadcasted".into(), logs.iter(), E::time_to_broadcasted, show_graphs);
single_stat("Time to in_block".into(), logs.iter(), E::time_to_inblock, show_graphs);
single_stat("Time to finalization".into(), logs.iter(), E::time_to_finalized, show_graphs);
single_stat(
"Time to finalization (monitor)".into(),
logs.iter(),
E::time_to_finalized_monitor,
show_graphs,
);
single_stat(
"Time to dropped (monitor)".into(),
logs.iter(),
E::time_to_mortal_dropped_monitor,
show_graphs,
);
failure_reason_stats("Dropped".into(), logs.iter(), E::get_dropped_reason);
failure_reason_stats("Error".into(), logs.iter(), E::get_error_reason);
failure_reason_stats("Invalid".into(), logs.iter(), E::get_invalid_reason);
failure_reason_stats("submit errors".into(), logs.iter(), E::get_submit_result_error);
failure_reason_stats(
"submit_and_watch errors".into(),
logs.iter(),
E::get_submit_and_watch_result_error,
);
let mut timeline_map = HashMap::<String, usize>::default();
let mut logs = logs.into_iter().filter(|e| e.get_sent_time().is_some()).collect::<Vec<_>>();
logs.sort_by_key(|e| e.get_sent_time().unwrap());
for e in &logs {
// info!("{:?}/{:3?} -> {}", e.account_metadata(), e.nonce(), e.get_inpool_events_string());
*timeline_map.entry(e.get_inpool_events_string()).or_default() += 1;
}
timeline_map.iter().for_each(|(l, c)| {
info!("{:>30} : {:?}", l, c);
});
// info!("sorted --------------------");
// logs.sort_by_key(|e| e.nonce());
// for e in logs {
// info!("{:?}/{:3?} -> {}", e.account_metadata(), e.nonce(), e.get_inpool_events_string());
// }
}
pub mod journal {
use std::path::Path;
use super::*;
pub struct Journal<T: TxTask> {
_p: PhantomData<T>,
}
//hack
#[derive(Serialize, Deserialize)]
struct DefaultExecutionLogSerdeHelper<H> {
events: Vec<ExecutionEvent<H>>,
account_metadata: AccountMetadata,
nonce: u128,
hash: H,
}
impl<H: BlockHash> DefaultExecutionLogSerdeHelper<H> {}
impl<H: BlockHash> From<DefaultExecutionLogSerdeHelper<H>> for TransactionExecutionLog<H> {
fn from(value: DefaultExecutionLogSerdeHelper<H>) -> Self {
TransactionExecutionLog {
events: value.events.clone().into(),
account_metadata: value.account_metadata,
nonce: value.nonce,
hash: value.hash,
total_counters: Default::default(),
}
}
}
impl<H: BlockHash + 'static> TransactionExecutionLog<H> {
fn get_data(&self) -> DefaultExecutionLogSerdeHelper<H> {
DefaultExecutionLogSerdeHelper {
events: self.events.read().clone(),
account_metadata: self.account_metadata.clone(),
nonce: self.nonce,
hash: self.hash,
}
}
}
impl<T: TxTask> Journal<T>
where
TxTaskHash<T>: 'static,
{
pub fn save_logs(logs: Logs<T>, filename: &Path) {
let data = logs.into_iter().map(|(h, l)| (h, l.get_data())).collect::<HashMap<_, _>>();
let json = serde_json::to_string(&data).unwrap();
let mut file = File::create(filename).unwrap();
file.write_all(json.as_bytes()).unwrap();
}
pub fn load_logs(name: &str, csv_filename: &Option<String>) -> Logs<T> {
let mut file = File::open(name).expect("Unable to open file");
let mut json = String::new();
file.read_to_string(&mut json).expect("Unable to read file");
let data: HashMap<TxTaskHash<T>, DefaultExecutionLogSerdeHelper<TxTaskHash<T>>> =
serde_json::from_str(&json).expect("Unable to deserialize JSON");
let data: Logs<T> = data
.into_iter()
.map(|l| (l.0, Arc::new(TransactionExecutionLog::from(l.1))))
.collect();
if let Some(csv_filename) = csv_filename {
Self::save_csv(data.clone(), Path::new(csv_filename));
}
data
}
pub fn save_csv(logs: Logs<T>, filename: &Path) {
let mut raw_data = HashMap::<TxTaskHash<T>, CsvEntry<T>>::default();
for (h, v) in logs {
raw_data.insert(h, v.into());
}
let mut file = File::create(filename).unwrap();
writeln!(file, "hash,{}", CsvEntry::<T>::header_line()).unwrap();
for (h, v) in raw_data {
writeln!(file, "{h:?},{v}").unwrap();
}
}
}
struct CsvEntry<T: TxTask> {
finalized: bool,
in_block: bool,
time_to_finalized: Option<Duration>,
time_to_result: Option<Duration>,
time_to_validated: Option<Duration>,
time_to_broadcasted: Option<Duration>,
time_to_in_block: Option<Duration>,
time_to_finalized_monitor: Option<Duration>,
_p: PhantomData<T>,
}
impl<T: TxTask> CsvEntry<T> {
fn header_line() -> String {
format!(
"{},{},{},{},{},{},{},{}",
"finalized",
"in_block",
"time_to_finalized",
"time_to_result",
"time_to_validated",
"time_to_broadcasted",
"time_to_in_block",
"time_to_finalized_monitor"
)
.to_string()
}
}
use std::fmt::{self, Formatter, Result};
impl<T: TxTask> fmt::Display for CsvEntry<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
write!(
f,
"{},{},{},{},{},{},{},{}",
self.finalized as u8,
self.in_block as u8,
self.time_to_finalized.unwrap_or_default().as_millis(),
self.time_to_result.unwrap_or_default().as_millis(),
self.time_to_validated.unwrap_or_default().as_millis(),
self.time_to_broadcasted.unwrap_or_default().as_millis(),
self.time_to_in_block.unwrap_or_default().as_millis(),
self.time_to_finalized_monitor.unwrap_or_default().as_millis()
)
}
}
impl<T: TxTask> From<Arc<TransactionExecutionLog<TxTaskHash<T>>>> for CsvEntry<T> {
fn from(l: Arc<TransactionExecutionLog<TxTaskHash<T>>>) -> Self {
Self {
finalized: l.time_to_finalized().is_some(),
in_block: l.time_to_inblock().is_some(),
time_to_finalized: l.time_to_finalized(),
time_to_result: l.time_to_result(),
time_to_validated: l.time_to_validated(),
time_to_broadcasted: l.time_to_broadcasted(),
time_to_in_block: l.time_to_inblock(),
time_to_finalized_monitor: l.time_to_finalized_monitor(),
_p: Default::default(),
}
}
}
}
@@ -0,0 +1,370 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
error::Error,
helpers::StreamOf,
transaction::{AccountMetadata, Transaction, TransactionStatus},
};
use futures::stream::{self};
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use std::{
any::Any,
fmt,
str::FromStr,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use subxt::ext::codec::{Decode, Encode};
use tokio::task::yield_now;
use tracing::trace;
pub(crate) const LOG_TARGET: &str = "fake_tx";
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Encode, Decode)]
#[serde(try_from = "String", into = "String")]
pub(crate) struct FakeHash([u8; 4]);
impl AsRef<[u8]> for FakeHash {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl From<[u8; 4]> for FakeHash {
fn from(value: [u8; 4]) -> Self {
Self(value)
}
}
impl fmt::Debug for FakeHash {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}
impl fmt::Display for FakeHash {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", hex::encode(self.0))
}
}
impl FromStr for FakeHash {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut bytes = [0; 4];
hex::decode_to_slice(s, &mut bytes).map_err(|_| "hex::decode failed".to_string())?;
Ok(FakeHash(bytes))
}
}
impl From<FakeHash> for String {
fn from(hash: FakeHash) -> Self {
hex::encode(hash.0)
}
}
impl TryFrom<String> for FakeHash {
type Error = String;
fn try_from(value: String) -> Result<Self, Self::Error> {
FakeHash::from_str(&value)
}
}
#[derive(Clone)]
pub struct EventDef {
event: TransactionStatus<FakeHash>,
delay: u32,
}
impl EventDef {
pub fn validated(delay: u32) -> Self {
Self { event: TransactionStatus::Validated, delay }
}
pub fn broadcasted(delay: u32) -> Self {
Self {
//todo
event: TransactionStatus::Broadcasted,
delay,
}
}
pub fn in_block(h: u32, delay: u32) -> Self {
Self { event: TransactionStatus::InBlock(h.to_le_bytes().into()), delay }
}
pub fn finalized(h: u32, delay: u32) -> Self {
Self { event: TransactionStatus::Finalized(h.to_le_bytes().into()), delay }
}
pub fn dropped(delay: u32) -> Self {
Self { event: TransactionStatus::Dropped("xxx".to_string()), delay }
}
pub fn invalid(delay: u32) -> Self {
Self { event: TransactionStatus::Invalid("xxx".to_string()), delay }
}
pub fn error(delay: u32) -> Self {
Self { event: TransactionStatus::Error("xxx".to_string()), delay }
}
}
#[derive(Clone)]
pub(crate) struct EventsStreamDef(pub Vec<EventDef>);
impl From<Vec<EventDef>> for EventsStreamDef {
fn from(value: Vec<EventDef>) -> Self {
Self(value)
}
}
pub(crate) struct FakeTransaction {
hash: FakeHash,
stream_def: Vec<EventsStreamDef>,
current_stream_def: AtomicUsize,
nonce: u128,
account_metadata: AccountMetadata,
}
impl Transaction for FakeTransaction {
type HashType = FakeHash;
fn hash(&self) -> Self::HashType {
self.hash
}
fn as_any(&self) -> &dyn Any {
self
}
fn nonce(&self) -> u128 {
self.nonce
}
fn account_metadata(&self) -> AccountMetadata {
self.account_metadata.clone()
}
fn valid_until(&self) -> &Option<u64> {
&None
}
}
#[allow(dead_code)]
impl FakeTransaction {
pub(crate) fn get_current_stream_def(&self) -> EventsStreamDef {
self.stream_def[self.current_stream_def.load(Ordering::Relaxed)].clone()
}
pub(crate) fn new_multiple(index: u32, nonce: u128, stream_def: Vec<EventsStreamDef>) -> Self {
Self {
stream_def,
hash: index.to_le_bytes().into(),
current_stream_def: Default::default(),
nonce,
account_metadata: AccountMetadata::Derived(index),
}
}
pub(crate) fn new_with_keyring(
account: String,
nonce: u128,
stream_def: Vec<EventsStreamDef>,
) -> Self {
let acc_as_bytes = account.as_bytes();
Self {
stream_def,
hash: [acc_as_bytes[0], acc_as_bytes[1], acc_as_bytes[2], acc_as_bytes[3]].into(),
current_stream_def: Default::default(),
nonce,
account_metadata: AccountMetadata::KeyRing(account),
}
}
pub(crate) fn new(index: u32, nonce: u128, stream_def: EventsStreamDef) -> Self {
Self {
stream_def: vec![stream_def],
hash: index.to_le_bytes().into(),
current_stream_def: Default::default(),
nonce,
account_metadata: AccountMetadata::Derived(index),
}
}
pub(crate) fn new_inblock_then_droppable_2nd_success(
hash: u32,
nonce: u128,
delay: u32,
) -> Self {
Self::new_multiple(
hash,
nonce,
vec![
EventsStreamDef(vec![
EventDef::broadcasted(delay),
EventDef::validated(delay),
EventDef::in_block(1, delay),
EventDef::in_block(2, delay),
EventDef::in_block(3, delay),
EventDef::dropped(delay),
]),
EventsStreamDef(vec![
EventDef::broadcasted(delay),
EventDef::validated(delay),
EventDef::in_block(1, delay),
EventDef::in_block(2, delay),
EventDef::in_block(3, delay),
EventDef::finalized(2, delay),
]),
],
)
}
pub(crate) fn new_droppable_2nd_success(hash: u32, nonce: u128, delay: u32) -> Self {
Self::new_multiple(
hash,
nonce,
vec![
EventsStreamDef(vec![EventDef::dropped(delay)]),
EventsStreamDef(vec![
EventDef::broadcasted(delay),
EventDef::validated(delay),
EventDef::in_block(1, delay),
EventDef::in_block(2, delay),
EventDef::in_block(3, delay),
EventDef::finalized(2, delay),
]),
],
)
}
pub(crate) fn new_droppable_loop(hash: u32, nonce: u128, delay: u32) -> Self {
Self::new_multiple(
hash,
nonce,
vec![
EventsStreamDef(vec![EventDef::dropped(delay)]),
EventsStreamDef(vec![EventDef::dropped(delay)]),
EventsStreamDef(vec![EventDef::dropped(delay)]),
EventsStreamDef(vec![EventDef::dropped(delay)]),
EventsStreamDef(vec![EventDef::dropped(delay)]),
],
)
}
pub(crate) fn new_droppable(hash: u32, nonce: u128, delay: u32) -> Self {
Self::new(hash, nonce, EventsStreamDef(vec![EventDef::dropped(delay)]))
}
pub(crate) fn new_invalid(hash: u32, nonce: u128, delay: u32) -> Self {
Self::new(hash, nonce, EventsStreamDef(vec![EventDef::invalid(delay)]))
}
pub(crate) fn new_error(hash: u32, nonce: u128, delay: u32) -> Self {
Self::new(hash, nonce, EventsStreamDef(vec![EventDef::error(delay)]))
}
pub(crate) fn new_finalizable_quick(hash: u32, nonce: u128) -> Self {
let delay = 0;
Self::new(
hash,
nonce,
EventsStreamDef(vec![
EventDef::broadcasted(delay),
EventDef::validated(delay),
EventDef::in_block(1, delay),
EventDef::in_block(2, delay),
EventDef::in_block(3, delay),
EventDef::finalized(2, delay),
]),
)
}
pub(crate) fn new_finalizable(hash: u32, nonce: u128) -> Self {
Self::new(
hash,
nonce,
EventsStreamDef(vec![
EventDef::broadcasted(100),
EventDef::validated(300),
EventDef::in_block(1, 1000),
EventDef::in_block(2, 1000),
EventDef::in_block(3, 1000),
EventDef::finalized(2, 2000),
]),
)
}
pub(crate) fn events(&self) -> StreamOf<TransactionStatus<FakeHash>> {
let def = self.get_current_stream_def();
stream::unfold(def.0.into_iter(), move |mut i| async {
yield_now().await;
if let Some(EventDef { event, delay }) = i.next() {
if delay > 0 {
tokio::time::sleep(Duration::from_millis(delay.into())).await;
}
trace!(target:LOG_TARGET, ?event, ?delay, "play");
Some((event, i))
} else {
None
}
})
.boxed()
}
pub(crate) async fn submit_result(&self) -> Result<FakeHash, Error> {
let EventDef { event, delay } = self
.get_current_stream_def()
.0
.pop()
.expect("there shall be at least event. qed.");
if delay > 0 {
tokio::time::sleep(Duration::from_millis(delay.into())).await;
}
trace!(target:LOG_TARGET, "submit_result: delayed: {:?}", self.hash);
match event {
TransactionStatus::Finalized(_) => Ok(self.hash),
TransactionStatus::Dropped(message) =>
Err(Error::Other(format!("submit-error:dropped:{message}").to_string())),
TransactionStatus::Invalid(message) =>
Err(Error::Other(format!("submit-error:invalid:{message}").to_string())),
TransactionStatus::Error(message) =>
Err(Error::Other(format!("submit-error:error:{message}").to_string())),
TransactionStatus::Validated |
TransactionStatus::NoLongerInBestBlock |
TransactionStatus::Broadcasted |
TransactionStatus::InBlock(_) => todo!(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::init_logger;
#[tokio::test]
async fn fake_transaction_events() {
init_logger();
let t = FakeTransaction::new(
1,
0,
EventsStreamDef(vec![
EventDef::broadcasted(100),
EventDef::validated(300),
EventDef::in_block(1, 1000),
EventDef::in_block(2, 1000),
EventDef::in_block(3, 1000),
EventDef::finalized(2, 2000),
]),
);
let v = t.events().collect::<Vec<_>>().await;
assert_eq!(
v,
vec![
TransactionStatus::Broadcasted,
TransactionStatus::Validated,
TransactionStatus::InBlock(1u32.to_le_bytes().into()),
TransactionStatus::InBlock(2u32.to_le_bytes().into()),
TransactionStatus::InBlock(3u32.to_le_bytes().into()),
TransactionStatus::Finalized(2u32.to_le_bytes().into())
]
);
tracing::info!("{v:?}")
}
}
@@ -0,0 +1,165 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
error::Error,
fake_transaction::{FakeHash, FakeTransaction},
helpers::StreamOf,
transaction::{
Transaction, TransactionMonitor, TransactionStatus, TransactionStatusIsDone,
TransactionsSink,
},
};
use async_trait::async_trait;
use futures_util::StreamExt;
use parking_lot::RwLock;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tracing::trace;
#[derive(Default, Clone)]
pub(crate) struct FakeTransactionsSink {
txs: Arc<RwLock<HashSet<FakeHash>>>,
#[allow(dead_code)]
pub(crate) nonces: Arc<RwLock<HashMap<String, u128>>>,
}
#[async_trait]
impl TransactionsSink<FakeHash> for FakeTransactionsSink {
async fn submit_and_watch(
&self,
tx: &dyn Transaction<HashType = FakeHash>,
) -> Result<StreamOf<TransactionStatus<FakeHash>>, Error> {
trace!(target: crate::fake_transaction::LOG_TARGET, "submit_and_watch");
let hash = tx.hash();
self.txs.write().insert(tx.hash());
let txs = self.txs.clone();
let tx = tx.as_any().downcast_ref::<FakeTransaction>().unwrap();
trace!(target: crate::fake_transaction::LOG_TARGET, hash=?tx.hash(),"submit_and_watch");
Ok(tx
.events()
.map(move |e| {
if e.is_terminal() {
txs.write().remove(&hash);
};
e
})
.boxed())
}
async fn submit(&self, tx: &dyn Transaction<HashType = FakeHash>) -> Result<FakeHash, Error> {
self.txs.write().insert(tx.hash());
let tx = tx.as_any().downcast_ref::<FakeTransaction>().unwrap();
let result = tx.submit_result().await;
self.txs.write().remove(&tx.hash());
result
}
///Current count of transactions being processed by sink
async fn pending_extrinsics(&self) -> usize {
self.txs.read().len()
}
fn transaction_monitor(&self) -> Option<&dyn TransactionMonitor<FakeHash>> {
None
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use super::*;
use crate::{fake_transaction::EventDef, init_logger};
use futures::future::join3;
use tracing::info;
#[tokio::test]
async fn fake_sink_submit_and_watch_works() {
let rpc = FakeTransactionsSink::default();
let t = FakeTransaction::new_finalizable(1, 0);
let t: Box<dyn Transaction<HashType = FakeHash>> = Box::from(t);
let events = rpc.submit_and_watch(&*t).await.unwrap();
assert_eq!(rpc.pending_extrinsics().await, 1);
let v = events.collect::<Vec<_>>().await;
assert_eq!(rpc.pending_extrinsics().await, 0);
assert_eq!(
v,
vec![
TransactionStatus::Broadcasted,
TransactionStatus::Validated,
TransactionStatus::InBlock(1u32.to_le_bytes().into()),
TransactionStatus::InBlock(2u32.to_le_bytes().into()),
TransactionStatus::InBlock(3u32.to_le_bytes().into()),
TransactionStatus::Finalized(2u32.to_le_bytes().into())
]
);
}
#[tokio::test]
async fn fake_sink_submit_work_droppable() {
let rpc = FakeTransactionsSink::default();
let t = FakeTransaction::new_droppable(1, 0, 10);
let t: Box<dyn Transaction<HashType = FakeHash>> = Box::from(t);
let r = rpc.submit(&*t).await.unwrap_err();
assert_eq!(r.to_string(), Error::Other("submit-error:dropped:xxx".to_string()).to_string());
}
#[tokio::test]
async fn fake_sink_submit_work_invalid() {
let rpc = FakeTransactionsSink::default();
let t = FakeTransaction::new_invalid(1, 0, 10);
let t: Box<dyn Transaction<HashType = FakeHash>> = Box::from(t);
let r = rpc.submit(&*t).await.unwrap_err();
assert_eq!(r.to_string(), Error::Other("submit-error:invalid:xxx".to_string()).to_string());
}
#[tokio::test]
async fn fake_sink_submit_work_error() {
let rpc = FakeTransactionsSink::default();
let t = FakeTransaction::new_error(1, 0, 10);
let t: Box<dyn Transaction<HashType = FakeHash>> = Box::from(t);
let r = rpc.submit(&*t).await.unwrap_err();
assert_eq!(r.to_string(), Error::Other("submit-error:error:xxx".to_string()).to_string());
}
#[tokio::test]
async fn fake_sink_submit_work_valid() {
let rpc = FakeTransactionsSink::default();
let t = FakeTransaction::new_finalizable_quick(111, 0);
let t: Box<dyn Transaction<HashType = FakeHash>> = Box::from(t);
let r = rpc.submit(&*t).await.unwrap();
assert_eq!(r, 111u32.to_le_bytes().into());
}
#[tokio::test]
async fn fake_sink_submit_work_valid_2() {
init_logger();
info!("start");
let rpc = Arc::from(FakeTransactionsSink::default());
let t1 = FakeTransaction::new(111, 0, vec![EventDef::finalized(11, 250)].into());
let t2 = FakeTransaction::new(222, 0, vec![EventDef::finalized(12, 250)].into());
let t1: Box<dyn Transaction<HashType = FakeHash>> = Box::from(t1);
let t2: Box<dyn Transaction<HashType = FakeHash>> = Box::from(t2);
let f = || async {
tokio::time::sleep(Duration::from_millis(200)).await;
rpc.pending_extrinsics().await
};
let result = join3(rpc.submit(&*t1), rpc.submit(&*t2), f()).await;
let r1 = result.0;
let r2 = result.1;
assert_eq!(result.2, 2);
assert_eq!(rpc.pending_extrinsics().await, 0);
assert_eq!(r1.unwrap(), 111u32.to_le_bytes().into());
assert_eq!(r2.unwrap(), 222u32.to_le_bytes().into());
}
//todo: submit_and_watch can return error
}
@@ -0,0 +1,38 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
pub use jsonrpsee::{
client_transport::ws::{self, EitherStream, Url, WsTransportClientBuilder},
core::client::{Client, Error},
};
use std::{pin::Pin, time::Duration};
use tokio_util::compat::Compat;
/// Helper type for a futures stream.
pub(crate) type StreamOf<I> = Pin<Box<dyn futures::Stream<Item = I> + Send>>;
/// Type alias for a websocket sender.
pub(crate) type Sender = ws::Sender<Compat<EitherStream>>;
/// Type alias for a websocket receiver.
pub(crate) type Receiver = ws::Receiver<Compat<EitherStream>>;
/// Build WS RPC client from URL
pub(crate) async fn client(url: &str) -> Result<Client, Error> {
let (sender, receiver) = ws_transport(url).await?;
Ok(Client::builder()
.max_buffer_capacity_per_subscription(4096)
.max_concurrent_requests(1280000)
.build_with_tokio(sender, receiver))
}
async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
let url = Url::parse(url).map_err(|e| Error::Transport(e.into()))?;
WsTransportClientBuilder::default()
.max_request_size(400 * 1024 * 1024)
.max_response_size(400 * 1024 * 1024)
.connection_timeout(Duration::from_secs(600))
.build(url)
.await
.map_err(|e| Error::Transport(e.into()))
}
+103
View File
@@ -0,0 +1,103 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! The Transaction Test Tool is a library allowing to send transactions to a substrate network,
//! monitor their status within the transaction pool. The main purpose of the library is to put a
//! network under different scenarios and ensure transaction pool behaves as expected.
//!
//! Additionally, there is a companion command-line interface (CLI) that offers an alternative
//! means of utilizing the library's capabilities.
//!
//! Use [`scenario::ScenarioBuilder`] to create a [`scenario::ScenarioExecutor`] instance, which
//! then executes the scenario against the node.
//!
//! Example:
//!
//! ```rust,ignore
//! // Shared Params
//! let send_threshold = 20_000;
//! let ws = "ws://127.0.0.1:9933";
//! let block_monitor = false;
//! let watched_txs = true;
//!
//! // Setup for scenario executor
//! let scenario_executor = ScenarioBuilder::new()
//! .with_rpc_uri(ws.to_string())
//! .with_chain_type(ChainType::Sub)
//! .with_block_monitoring(true)
//! .with_start_id("0".to_string())
//! .with_last_id(99)
//! .with_nonce_from(Some(0))
//! .with_txs_count(100)
//! .with_watched_txs(watched_txs)
//! .with_send_threshold(send_threshold)
//! .build()
//! .await;
//!
//! let logs = scenario_executor.execute().await;
//! ```
pub mod block_monitor;
pub mod cli;
pub mod error;
pub mod execution_log;
pub mod fake_transaction;
pub mod fake_transaction_sink;
pub mod helpers;
pub mod runner;
pub mod scenario;
pub mod subxt_api_connector;
pub mod subxt_transaction;
pub mod transaction;
// Re-export commonly used types for custom payload builders
pub use subxt_transaction::{
eth_transfer_payload_builder, remark_payload_builder, sub_transfer_payload_builder,
EthPayloadBuilderFn, EthTxBuildContext, PayloadBuilderFn, SubPayloadBuilderFn,
SubTxBuildContext, TxPayloadBuildContext,
};
/// Initialize the logger for various binaries (e.g. ttxt or test binaries).
pub fn init_logger() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
use tracing::Metadata;
use tracing_subscriber::{
fmt,
layer::{Context, Filter, SubscriberExt},
registry, EnvFilter, Layer,
};
let filter = EnvFilter::from_default_env();
struct F {
env_filter: EnvFilter,
}
impl Default for F {
fn default() -> Self {
Self { env_filter: EnvFilter::from_default_env() }
}
}
impl<S> Filter<S> for F {
fn enabled(&self, meta: &Metadata<'_>, cx: &Context<'_, S>) -> bool {
!self.env_filter.enabled(meta, cx.clone()) &&
meta.target() == execution_log::STAT_TARGET
}
}
let debug_layer = fmt::layer().with_target(true).with_filter(filter);
let stat_layer = fmt::layer()
.with_target(false)
.with_level(false)
.without_time()
.with_filter(F::default());
let subscriber = registry::Registry::default().with(debug_layer).with(stat_layer);
tracing::subscriber::set_global_default(subscriber)
.expect("Unable to set a global subscriber");
});
}
+582
View File
@@ -0,0 +1,582 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
error::Error,
execution_log::{
journal::Journal, make_stats, Counters, ExecutionEvent, ExecutionLog, Logs,
TransactionExecutionLog, STAT_TARGET,
},
transaction::{Transaction, TransactionStatusIsDone, TransactionsSink},
};
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use std::{
path::Path,
pin::Pin,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use subxt_core::config::Hash as BlockHash;
use tokio::{
select,
sync::mpsc::{channel, Receiver, Sender},
};
use tracing::{debug, error, info, instrument, trace, warn, Span};
const LOG_TARGET: &str = "runner";
/// Transaction hash type.
pub type TxTaskHash<T> = <<T as TxTask>::Transaction as Transaction>::HashType;
/// Provides a transaction execution result.
pub enum ExecutionResult<T: TxTask> {
Error(TxTaskHash<T>),
Done(TxTaskHash<T>),
}
impl<T: TxTask> std::fmt::Debug for ExecutionResult<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Error(h) => write!(f, "Error {h:?}"),
Self::Done(h) => write!(f, "Done {h:?}"),
}
}
}
/// Interface for tasks that monitor transaction execution.
#[async_trait]
pub trait TxTask: Send + Sync + Sized + std::fmt::Debug {
type Transaction: Transaction;
fn tx(&self) -> &Self::Transaction;
fn is_watched(&self) -> bool;
async fn send_watched_tx(
self,
log: Arc<dyn ExecutionLog<HashType = TxTaskHash<Self>>>,
rpc: Arc<dyn TransactionsSink<TxTaskHash<Self>>>,
) -> ExecutionResult<Self>;
async fn send_tx(
self,
log: Arc<dyn ExecutionLog<HashType = TxTaskHash<Self>>>,
rpc: Arc<dyn TransactionsSink<TxTaskHash<Self>>>,
) -> ExecutionResult<Self>;
async fn execute(
self,
log: Arc<dyn ExecutionLog<HashType = TxTaskHash<Self>>>,
rpc: Arc<dyn TransactionsSink<TxTaskHash<Self>>>,
) -> ExecutionResult<Self>;
}
/// Holds the logic for a transaction submission.
pub struct DefaultTxTask<T>
where
T: Transaction,
<T as Transaction>::HashType: 'static,
{
tx: T,
watched: bool,
}
impl<T> std::fmt::Debug for DefaultTxTask<T>
where
T: Transaction,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("tx").field("nonce", &self.tx.nonce()).finish()
}
}
#[async_trait]
impl<H: BlockHash, T: Transaction<HashType = H> + Send> TxTask for DefaultTxTask<T> {
type Transaction = T;
fn tx(&self) -> &T {
&self.tx
}
fn is_watched(&self) -> bool {
self.watched
}
async fn send_watched_tx(
self,
log: Arc<dyn ExecutionLog<HashType = TxTaskHash<Self>>>,
rpc: Arc<dyn TransactionsSink<TxTaskHash<Self>>>,
) -> ExecutionResult<Self> {
log.push_event(ExecutionEvent::sent());
match rpc.submit_and_watch(self.tx()).await {
Ok(mut stream) => {
log.push_event(ExecutionEvent::submit_and_watch_result(Ok(())));
while let Some(status) = stream.next().await {
debug!(target:LOG_TARGET,tx=?self,"status: {status:?}");
log.push_event(status.clone().into());
if status.is_finalized() {
return ExecutionResult::Done(self.tx().hash());
} else if status.is_error() {
return ExecutionResult::Error(self.tx().hash());
} else {
continue;
}
}
//shall not happen to be here, return error.
warn!(target:LOG_TARGET,tx=?self,"stream error");
ExecutionResult::Error(self.tx().hash())
},
Err(e) => {
info!(nonce=?self.tx().nonce(),"submit_and_watch: error: {e:?}");
let result = ExecutionResult::Error(self.tx().hash());
// debug!(?hash, nonce, "error: {e:?} {result:?}");
log.push_event(ExecutionEvent::submit_and_watch_result(Err(e)));
result
},
}
}
async fn send_tx(
self,
log: Arc<dyn ExecutionLog<HashType = TxTaskHash<Self>>>,
rpc: Arc<dyn TransactionsSink<TxTaskHash<Self>>>,
) -> ExecutionResult<Self> {
log.push_event(ExecutionEvent::sent());
match rpc.submit(self.tx()).await {
Ok(_) => {
log.push_event(ExecutionEvent::submit_result(Ok(())));
if let Some(monitor) = rpc.transaction_monitor() {
match monitor.wait(self.tx().hash(), *self.tx().valid_until()).await {
Ok(block_hash) => {
log.push_event(ExecutionEvent::finalized_monitor(block_hash));
ExecutionResult::Done(self.tx().hash())
},
Err(err) => {
match err {
Error::MortalLifetimeSurpassed(block_number) => log.push_event(
ExecutionEvent::mortal_dropped_monitor(block_number),
),
_ => {
error!(target: LOG_TARGET, ?err, "error while waiting for transaction");
log.push_event(ExecutionEvent::submit_result(Err(err)))
},
};
ExecutionResult::Error(self.tx().hash())
},
}
} else {
ExecutionResult::Done(self.tx().hash())
}
},
Err(e) => {
log.push_event(ExecutionEvent::submit_result(Err(e)));
ExecutionResult::Error(self.tx().hash())
},
}
}
async fn execute(
self,
log: Arc<dyn ExecutionLog<HashType = TxTaskHash<Self>>>,
rpc: Arc<dyn TransactionsSink<TxTaskHash<Self>>>,
) -> ExecutionResult<Self> {
if self.is_watched() {
self.send_watched_tx(log, rpc).await
} else {
self.send_tx(log, rpc).await
}
}
}
impl<T: Transaction> DefaultTxTask<T> {
pub fn new_watched(tx: T) -> Self {
Self { tx, watched: true }
}
pub fn new_unwatched(tx: T) -> Self {
Self { tx, watched: false }
}
}
/// Holds the logic that handles multiple transactions execution on a specific chain.
pub struct Runner<Task: TxTask, Sink: TransactionsSink<TxTaskHash<Task>>> {
send_threshold: usize,
logs: Logs<Task>,
transactions: Vec<Task>,
done: Vec<TxTaskHash<Task>>,
errors: Vec<TxTaskHash<Task>>,
rpc: Arc<Sink>,
stop_rx: Receiver<()>,
timeout: Option<Duration>,
event_counters: Arc<Counters>,
last_displayed: Option<Instant>,
log_file_name: Option<String>,
base_dir_path: Option<String>,
executor_id: Option<String>,
}
impl<Task: TxTask + 'static, Sink> Runner<Task, Sink>
where
Sink: TransactionsSink<TxTaskHash<Task>> + 'static,
TxTaskHash<Task>: 'static,
{
/// Instantiates a new transactions [`Runner`].
pub fn new(
send_threshold: usize,
rpc: Sink,
transactions: Vec<Task>,
log_file_name: Option<String>,
base_dir_path: Option<String>,
executor_id: Option<String>,
timeout: Option<Duration>,
) -> (Sender<()>, Self) {
let event_counters = Arc::from(Counters::default());
let logs = transactions
.iter()
.map(|t| {
(
t.tx().hash(),
Arc::new(TransactionExecutionLog::new_with_tx(t.tx(), event_counters.clone())),
)
})
.collect();
let (tx, rx) = channel(1);
(
tx,
Self {
send_threshold,
logs,
transactions,
rpc: rpc.into(),
done: Default::default(),
errors: Default::default(),
stop_rx: rx,
event_counters,
last_displayed: None,
log_file_name,
base_dir_path,
executor_id,
timeout,
},
)
}
fn pop(&mut self) -> Option<Task> {
trace!(target:LOG_TARGET, "before pop");
let r = self.transactions.pop();
trace!(target:LOG_TARGET, "after pop {}", r.is_some());
r
}
async fn consume_pending(
&mut self,
workers: &mut FuturesUnordered<Pin<Box<dyn Future<Output = ExecutionResult<Task>> + Send>>>,
) {
let (to_consume, current_count) = if self.send_threshold == usize::MAX {
(usize::MAX, None)
} else {
let current_count = self.rpc.pending_extrinsics().await;
(
self.send_threshold
.saturating_sub(current_count)
.saturating_sub(self.event_counters.buffered()),
Some(current_count),
)
};
let mut pushed = 0;
let counters_displayed = format!("{}", self.event_counters);
let mut nonces = vec![];
for _ in 0..to_consume {
let task = self.pop();
if let Some(task) = task {
nonces.push(task.tx().nonce());
let log = self.logs[&task.tx().hash()].clone();
log.push_event(ExecutionEvent::popped());
trace!(target:LOG_TARGET,task = ?&task, workers_len=workers.len(), "before push");
workers.push(task.execute(log, self.rpc.clone()));
pushed += 1;
trace!(target:LOG_TARGET,workers_len=workers.len(), "after push");
} else {
break;
};
}
let min_nonce_sent = nonces.iter().min();
let display = if let Some(last_displayed) = self.last_displayed {
last_displayed.elapsed() > Duration::from_millis(200)
} else {
true
};
if display {
self.last_displayed = Some(Instant::now());
if let Some(current_count) = current_count {
info!(
current_count,
pushed, min_nonce_sent, "consume pending {}", counters_displayed,
);
} else {
info!(pushed, min_nonce_sent, "consume pending {}", counters_displayed,);
}
};
}
fn log_file_path(&self) -> String {
let datetime: chrono::DateTime<chrono::Local> = SystemTime::now().into();
let formatted_date = datetime.format("%Y%m%d_%H%M%S");
let default_file_name = self
.executor_id
.as_ref()
.map(|id| format!("ttxt_{id}_{formatted_date}.json"))
.unwrap_or(format!("ttxt_{formatted_date}.json"));
self.base_dir_path
.as_ref()
.map(|basedir| {
let filename = self
.log_file_name
.as_ref()
.map(|filename| format!("{basedir}/{filename}_{formatted_date}"))
.unwrap_or(format!("{basedir}/{default_file_name}"));
filename
})
.unwrap_or(default_file_name)
}
/// Drives the runner to completion.
#[instrument(skip(self), fields(id = tracing::field::Empty))]
pub async fn run(&mut self) -> Logs<Task> {
let span = Span::current();
if let Some(id) = &self.executor_id {
span.record("id", id);
}
let start = Instant::now();
let original_transactions_count = self.transactions.len();
let mut workers = FuturesUnordered::new();
for _ in 0..self.send_threshold {
if let Some(t) = self.transactions.pop() {
let t = Box::new(t);
let log = self.logs[&t.tx().hash()].clone();
log.push_event(ExecutionEvent::popped());
workers.push(t.execute(log, self.rpc.clone()));
} else {
break;
}
}
let mut timeout = self.timeout.unwrap_or(Duration::from_secs(u64::MAX));
let mut timeout_reached = false;
loop {
let iteration_start = Instant::now();
select! {
_ = tokio::time::sleep(timeout) => {
timeout_reached = true;
break;
}
_ = tokio::time::sleep(Duration::from_millis(3000)) => {
self.consume_pending(&mut workers).await;
}
_ = self.stop_rx.recv() => {
info!("received termination request");
break;
}
done = workers.next() => {
match done {
Some(result) => {
debug!(target:LOG_TARGET,?result, workers_len=workers.len(), "FINISHED");
self.consume_pending(&mut workers).await;
match result {
ExecutionResult::Done(hash) => {
self.done.push(hash)
},
ExecutionResult::Error(hash) => {
self.errors.push(hash)
}
}
trace!(target:LOG_TARGET, "after match");
}
None => {
info!(target:LOG_TARGET, transactions_len = self.transactions.len(), "all futures done ");
// tokio::time::sleep(Duration::from_millis(100)).await;
// self.consume_pending(&mut workers).await;
break;
}
}
}
}
// Adjust timeout based on how long the iteration took.
timeout = timeout.saturating_sub(iteration_start.elapsed());
}
Journal::<Task>::save_logs(self.logs.clone(), Path::new(self.log_file_path().as_str()));
info!(target: STAT_TARGET, total_duration = ?start.elapsed(), ?original_transactions_count, ?timeout_reached);
make_stats(self.logs.values().cloned(), false);
self.logs.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
fake_transaction::FakeTransaction,
fake_transaction_sink::FakeTransactionsSink,
init_logger, subxt_api_connector,
subxt_transaction::{EthRuntimeConfig, EthTransaction, EthTransactionsSink},
transaction::AccountMetadata,
};
use subxt::{
config::substrate::SubstrateExtrinsicParamsBuilder as Params, dynamic::Value, OnlineClient,
};
use subxt_signer::eth::dev;
use tracing::trace;
pub type FakeTxTask = DefaultTxTask<FakeTransaction>;
#[tokio::test]
async fn oh_god() {
init_logger();
let rpc = FakeTransactionsSink::default();
let mut transactions = (0..10)
.map(|i| FakeTxTask::new_watched(FakeTransaction::new_finalizable_quick(i, 0)))
// .map(|t| Box::from(t) as Box<dyn Transaction<HashType = FakeHash>>)
.collect::<Vec<_>>();
transactions.push(FakeTxTask::new_watched(FakeTransaction::new_droppable(11u32, 0, 300)));
transactions.push(FakeTxTask::new_watched(FakeTransaction::new_invalid(11u32, 0, 300)));
transactions.push(FakeTxTask::new_unwatched(FakeTransaction::new_invalid(11u32, 0, 300)));
transactions.push(FakeTxTask::new_unwatched(FakeTransaction::new_error(12u32, 0, 300)));
let (_c, mut r) = Runner::<DefaultTxTask<FakeTransaction>, FakeTransactionsSink>::new(
5,
rpc,
transactions,
None,
None,
None,
None,
);
r.run().await;
}
type EthTestTxTask = DefaultTxTask<EthTransaction>;
async fn make_eth_test_transaction(
api: &OnlineClient<EthRuntimeConfig>,
mortality: Option<u64>,
nonce: u64,
) -> EthTransaction {
let alith = dev::alith();
let baltathar = dev::baltathar();
let mut tx_params = Params::new().nonce(nonce);
if let Some(mortal) = mortality {
tx_params = tx_params.mortal(mortal);
}
let tx_params = tx_params.build();
// let tx_call = subxt::dynamic::tx("System", "remark",
// vec![Value::from_bytes("heeelooo")]);
let tx_call = subxt::dynamic::tx(
"Balances",
"transfer_keep_alive",
vec![
// // Substrate:
// Value::unnamed_variant("Id", [Value::from_bytes(receiver.public())]),
// Eth:
Value::unnamed_composite(vec![Value::from_bytes(alith.public_key())]),
Value::u128(1u32.into()),
],
);
// TODO: implement retry logic if failures occur due to `create_partial` call.
let tx = EthTransaction::new(
api.tx()
.create_partial(&tx_call, &baltathar.public_key().to_account_id(), tx_params)
.await
.unwrap()
.sign(&baltathar),
nonce as u128,
mortality,
AccountMetadata::KeyRing("baltathar".to_string()),
);
trace!(target:LOG_TARGET,"tx hash: {:?}", tx.hash());
tx
}
// This test needs a network up with a collator responding on 127.0.0.1:9933
#[ignore]
#[tokio::test]
async fn oh_god2() {
init_logger();
// let api = OnlineClient::<EthRuntimeConfig>::from_insecure_url("ws://127.0.0.1:9933")
// .await
// .unwrap();
let api = subxt_api_connector::connect("ws://127.0.0.1:9933", false).await.unwrap();
let rpc = EthTransactionsSink::new().await;
let mut transactions = Vec::new();
for i in 0..3000 {
transactions
.push(EthTestTxTask::new_watched(make_eth_test_transaction(&api, None, i).await));
}
transactions.reverse();
let (_c, mut r) = Runner::<DefaultTxTask<EthTransaction>, EthTransactionsSink>::new(
10_000,
rpc,
transactions,
None,
None,
None,
None,
);
r.run().await;
}
#[tokio::test]
async fn resubmit() {
init_logger();
let rpc = FakeTransactionsSink::default();
let transactions = (0..100000)
.map(|i| FakeTxTask::new_watched(FakeTransaction::new_droppable_2nd_success(i, 0, 0)))
// .map(|t| Box::from(t) as Box<dyn Transaction<HashType = FakeHash>>)
.collect::<Vec<_>>();
let (_, mut r) = Runner::<DefaultTxTask<FakeTransaction>, FakeTransactionsSink>::new(
100000,
rpc,
transactions,
None,
None,
None,
None,
);
r.run().await;
}
#[tokio::test]
async fn read_json() {
init_logger();
let logs = Journal::<DefaultTxTask<FakeTransaction>>::load_logs(
"tests/out_20250206_151339.json",
&None,
);
make_stats(logs.values().cloned(), true);
}
}
+732
View File
@@ -0,0 +1,732 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
//! Provides transactions sending scenarios together with associated builders and executors.
use std::{collections::HashMap, ops::Range, sync::Arc, time::Duration};
use clap::{Subcommand, ValueEnum};
use futures::executor::block_on;
use subxt::utils::H256;
use subxt_core::config::Hash as BlockHash;
use tokio::sync::mpsc::Sender;
use crate::{
block_monitor::BlockMonitor,
execution_log::TransactionExecutionLog,
runner::{DefaultTxTask, Runner, TxTask},
subxt_transaction::{
eth_transfer_payload_builder, generate_ecdsa_keypair, generate_sr25519_keypair,
remark_payload_builder, sub_transfer_payload_builder, EthPayloadBuilderFn, EthTransaction,
EthTransactionsSink, EthTxBuildContext, SubPayloadBuilderFn, SubTxBuildContext,
SubstrateTransaction, SubstrateTransactionsSink,
},
transaction::{
BuildTransactionParams, EthTransactionBuilder, SubstrateTransactionBuilder, Transaction,
TransactionBuilder, TransactionCall, TransactionRecipe, TransactionsSink,
},
};
use subxt::tx::DynamicPayload;
#[derive(Clone, Debug)]
/// Holds information relevant for transaction generation.
pub(crate) struct TransactionBuildParams {
pub account: String,
pub nonce: Option<u128>,
pub mortality: Option<u64>,
}
#[derive(Debug, Clone)]
/// Describes the account types that will participate
/// in a [`ScenarioType`].
pub enum AccountsDescription {
Keyring(String),
Derived(Range<u32>),
}
#[derive(ValueEnum, Clone)]
pub enum ChainType {
/// Substrate compatible chain.
Sub,
/// Etheruem compatible chain.
Eth,
/// A fake chain used for experiments & tests.
Fake,
}
#[derive(Subcommand, Clone)]
/// This enum represents different transactions sending scenarios.
pub enum ScenarioType {
/// Send single transaction to the node.
OneShot {
/// Account identifier to be used. It can be keyring account (alice, bob,...) or number of
/// pre-funded account, index used for derivation.
#[clap(long, default_value = "alice")]
account: String,
/// Nonce used for the account.
#[clap(long)]
nonce: Option<u128>,
},
/// Send multiple transactions to the node using a single account.
FromSingleAccount {
/// Account identifier to be used. It can be keyring account (alice, bob,...) or number of
/// pre-funded account, index used for derivation.
#[clap(long, default_value = "alice")]
account: String,
/// Starting nonce for 1st transaction in the batch. If not given the current nonce for
/// the account will be fetched from node for the first transaction in the batch.
#[clap(long)]
from: Option<u128>,
/// Number of transaction in the batch.
#[clap(long, default_value_t = 1)]
count: u32,
},
/// Send multiple transactions to the node using multiple accounts.
FromManyAccounts {
/// First account identifier to be used (index of the pre-funded account used for a
/// derivation).
#[clap(long)]
start_id: u32,
/// Last account identifier to be used.
#[clap(long)]
last_id: u32,
/// Starting nonce of transactions batch. If not given the current nonce for each account
/// will be fetched from node.
#[clap(long)]
from: Option<u128>,
/// Number of transaction in the batch per account.
#[clap(long, default_value_t = 1)]
count: u32,
},
}
pub type EthScenarioRunner = Runner<DefaultTxTask<EthTransaction>, EthTransactionsSink>;
pub struct EthScenarioExecutor {
stop_sender: Sender<()>,
runner: EthScenarioRunner,
}
pub type SubstrateScenarioRunner =
Runner<DefaultTxTask<SubstrateTransaction>, SubstrateTransactionsSink>;
pub struct SubstrateScenarioExecutor {
stop_sender: Sender<()>,
runner: SubstrateScenarioRunner,
}
impl SubstrateScenarioExecutor {
pub(crate) fn new(stop_sender: Sender<()>, runner: SubstrateScenarioRunner) -> Self {
SubstrateScenarioExecutor { stop_sender, runner }
}
}
impl EthScenarioExecutor {
pub(crate) fn new(stop_sender: Sender<()>, runner: EthScenarioRunner) -> Self {
EthScenarioExecutor { stop_sender, runner }
}
}
/// Multi-chain scenario executor.
pub enum ScenarioExecutor {
Eth(EthScenarioExecutor),
Substrate(SubstrateScenarioExecutor),
}
impl ScenarioExecutor {
/// Executes the encapsulated scenario to send out transactions.
///
/// Executes the set of transaction sending tasks, and follows a transaction status on the node
/// side until a final state is reached.
///
/// It returns a mapping of transaction hashes to their respective execution log entries,
/// providing a detailed view of the transaction's execution process.
///
/// It is subject to the configured timeout, and if it will be reached, will return a subset of
/// the execution logs.
pub async fn execute(self) -> HashMap<H256, Arc<TransactionExecutionLog<H256>>> {
match self {
ScenarioExecutor::Eth(mut inner) => inner.runner.run().await,
ScenarioExecutor::Substrate(mut inner) => inner.runner.run().await,
}
}
/// Installs a ctrl_c handler which sends a stop notification on the executor
/// stop sender channel, to notify the stop of the scenario for displaying partial stats about
/// the transactions execution.
///
/// Can be called only once for the lifetime of a the program execution.
fn install_ctrlc_stop_hook(&self) {
let stop_sender = match &self {
ScenarioExecutor::Eth(inner) => inner.stop_sender.clone(),
ScenarioExecutor::Substrate(inner) => inner.stop_sender.clone(),
};
ctrlc::set_handler(move || {
block_on(stop_sender.send(())).expect("Could not send signal on channel.")
})
.expect("Error setting Ctrl-C handler");
}
}
/// Source of the transaction payload builder.
enum TxPayloadBuilderSource {
/// Recipe to be resolved to a builder in `build()` based on chain_type.
Recipe(TransactionRecipe),
/// Custom Substrate payload builder.
SubCustom(SubPayloadBuilderFn),
/// Custom Ethereum payload builder.
EthCustom(EthPayloadBuilderFn),
}
impl TxPayloadBuilderSource {
/// Resolve the source into a Substrate payload builder.
fn into_sub_builder(self) -> SubPayloadBuilderFn {
match self {
Self::SubCustom(f) => f,
Self::Recipe(recipe) => match recipe.call {
TransactionCall::Remark(size_kb) => remark_payload_builder(size_kb),
TransactionCall::Transfer => sub_transfer_payload_builder(),
},
Self::EthCustom(_) => {
panic!("EthCustom payload builder cannot be used with ChainType::Sub")
},
}
}
/// Resolve the source into an Ethereum payload builder.
fn into_eth_builder(self) -> EthPayloadBuilderFn {
match self {
Self::EthCustom(f) => f,
Self::Recipe(recipe) => match recipe.call {
TransactionCall::Remark(size_kb) => remark_payload_builder(size_kb),
TransactionCall::Transfer => eth_transfer_payload_builder(),
},
Self::SubCustom(_) => {
panic!("SubCustom payload builder cannot be used with ChainType::Eth")
},
}
}
}
/// Building logic for the execution of a scenario.
pub struct ScenarioBuilder {
account_id: Option<String>,
start_id: Option<u32>,
last_id: Option<u32>,
nonce_from: Option<u128>,
txs_count: u32,
tx_payload_builder_source: Option<TxPayloadBuilderSource>,
mortality: Option<u64>,
does_block_monitoring: bool,
watched_txs: bool,
send_threshold: Option<usize>,
rpc_uri: Option<String>,
chain_type: Option<ChainType>,
installs_ctrl_c_stop_hook: bool,
executor_id: Option<String>,
tip: u128,
log_file_name_prefix: Option<String>,
base_dir_path: Option<String>,
timeout: Option<Duration>,
use_legacy_backend: bool,
}
impl Default for ScenarioBuilder {
fn default() -> Self {
Self::new()
}
}
impl ScenarioBuilder {
/// A default initializer of the builder, with a few defaults:
/// - `tx_payload_builder_source` is set to transfer recipe.
/// - `does_block_monitoring` is set to `false`.
/// - `installs_ctrl_c_stop_hook` is set to `false`.
/// - `send_threshold` is set to `1000`.
pub fn new() -> Self {
ScenarioBuilder {
account_id: None,
start_id: None,
last_id: None,
nonce_from: None,
txs_count: 1,
tx_payload_builder_source: Some(TxPayloadBuilderSource::Recipe(
TransactionRecipe::transfer(),
)),
does_block_monitoring: false,
mortality: None,
watched_txs: false,
send_threshold: Some(1000),
rpc_uri: None,
chain_type: None,
installs_ctrl_c_stop_hook: false,
executor_id: None,
tip: 0,
log_file_name_prefix: None,
base_dir_path: None,
timeout: None,
use_legacy_backend: false,
}
}
/// Configure the account id for building a batch of transactions based on a single signer.
/// The setter parameter is a string that can be in the form of a number, in which case it will
/// behave the same as using `with_start_id` (without `with_last_id`), but it can receive
/// a derivation path like the usual Polkadot development accounts (e.g. "alice", "bob", etc).
pub fn with_account_id(mut self, account_id: String) -> Self {
self.account_id = Some(account_id);
self
}
/// Configure the account id for the first signer used for the transactions building.
/// If the builder isn't configured with a last signer account id, then the scenario
/// builder will build transactions only for the account specified with this setter.
///
/// It is usually used in pair with `with_last_id`, to set an ids range where each id will be
/// the last part of a derivation path used for multiple accounts generation, each being a
/// signer for a batch of transactions.
pub fn with_start_id(mut self, start_id: u32) -> Self {
self.start_id = Some(start_id);
self
}
/// Last id of an account signer that is also representing the end of an ids range,
/// each id being the last part of a derivation path used to generate accounts that sign a set
/// of transactions (see
/// [`crate::subxt_transaction::derive_accounts`]).
pub fn with_last_id(mut self, last_id: u32) -> Self {
self.last_id = Some(last_id);
self
}
/// The start of a nonce counter that's incremented with each built transaction, in relation to
/// a specific signer account (that can be also part of a range of accounts as it happens when
/// both `start_id` and `last_id` parameters of the builder are set), while the number of the
/// built transactions is lower than `txs_count`.
pub fn with_nonce_from(mut self, nonce_from: Option<u128>) -> Self {
self.nonce_from = nonce_from;
self
}
/// The number of the transactions that will be built in relation to a signer account.
pub fn with_txs_count(mut self, txs_count: u32) -> Self {
self.txs_count = txs_count;
self
}
/// Sets transaction recipe to a regular balances transfer.
///
/// The builder is already initialised with a transfer transaction recipe.
pub fn with_transfer_recipe(mut self) -> Self {
self.tx_payload_builder_source =
Some(TxPayloadBuilderSource::Recipe(TransactionRecipe::transfer()));
self
}
/// Set a remark transaction recipe.
pub fn with_remark_recipe(mut self, remark: u32) -> Self {
self.tx_payload_builder_source =
Some(TxPayloadBuilderSource::Recipe(TransactionRecipe::remark(remark)));
self
}
/// Set a custom payload builder for Substrate chains.
///
/// The closure receives a `SubTxBuildContext` with account info, nonce, etc.
pub fn with_tx_payload_builder_sub<F>(mut self, f: F) -> Self
where
F: Fn(&SubTxBuildContext) -> DynamicPayload + Send + Sync + 'static,
{
self.tx_payload_builder_source = Some(TxPayloadBuilderSource::SubCustom(Arc::new(f)));
self
}
/// Set a custom payload builder for Ethereum chains.
///
/// The closure receives an `EthTxBuildContext` with account info, nonce, etc.
pub fn with_tx_payload_builder_eth<F>(mut self, f: F) -> Self
where
F: Fn(&EthTxBuildContext) -> DynamicPayload + Send + Sync + 'static,
{
self.tx_payload_builder_source = Some(TxPayloadBuilderSource::EthCustom(Arc::new(f)));
self
}
/// Allows to specify transaction tip. This indirectly controls priority of transaction.
pub fn with_tip(mut self, tip: u128) -> Self {
self.tip = tip;
self
}
/// Spawns block monitor. Allows to monitor the transaction finalization status for unwatched
/// transactions.
pub fn with_block_monitoring(mut self, does_block_monitoring: bool) -> Self {
self.does_block_monitoring = does_block_monitoring;
self
}
/// Sets for how many blocks a transaction is considered valid.
///
/// Note: using this setter can increase the transaction creation times which can impact heavy
/// load tests that create millions of transactions. This method instructs a scenario to use
/// an online client for txs creation, since creating mortal txs requires knowledge about the
/// last finalized block on chain.
pub fn with_mortality(mut self, mortality: u64) -> Self {
self.mortality = Some(mortality);
self
}
/// Defines the URI of the node where transactions are dispatched.
pub fn with_rpc_uri(mut self, rpc_uri: String) -> Self {
self.rpc_uri = Some(rpc_uri);
self
}
/// Send transactions using `submit_and_watch` method. Progress of all transcations will be
/// monitored. If using unwatched transaction `Self::with_block_monitoring` may be useful for
/// tracking finalization of transactions.
pub fn with_watched_txs(mut self, watched_txs: bool) -> Self {
self.watched_txs = watched_txs;
self
}
/// Allows to specify the chain type.
pub fn with_chain_type(mut self, chain_type: ChainType) -> Self {
self.chain_type = Some(chain_type);
self
}
/// Specifies how many transactions in transaction pool on the node side will be maintained at
/// the fork of the best chain.
///
/// `usize::MAX` means that the count of `pending_extrinsics` on node side is not called, and an
/// executor will send as much as possible.
pub fn with_send_threshold(mut self, send_threshold: usize) -> Self {
self.send_threshold = Some(send_threshold);
self
}
/// If specified, the stats will be printed when `stop` signal is sent to process.
pub fn with_installed_ctrlc_stop_hook(mut self, installs_ctrl_c_stop_hook: bool) -> Self {
self.installs_ctrl_c_stop_hook = installs_ctrl_c_stop_hook;
self
}
/// Sets a maximum duration for the scenario execution.
///
/// If specified, execution will be limited to the given timeout, ensuring the executor returns
/// with logs if the duration is reached. Typically, the scenario will complete earlier, but
/// the timeout acts as a safeguard to prevent indefinite execution.
pub fn with_timeout_in_secs(mut self, secs: u64) -> Self {
self.timeout = Some(Duration::from_secs(secs));
self
}
/// Defines the log prefix for the executor instance being built.
pub fn with_executor_id(mut self, executor_id: String) -> Self {
self.executor_id = Some(executor_id);
self
}
/// Defines the prefix of the log name.
pub fn with_log_file_name_prefix(mut self, log_file_name_prefix: String) -> Self {
self.log_file_name_prefix = Some(log_file_name_prefix);
self
}
/// Defines the path of the directory where the log file will be stored.
pub fn with_base_dir_path(mut self, base_dir_path: String) -> Self {
self.base_dir_path = Some(base_dir_path);
self
}
/// Use legacy backend. In some scenarios using this may help overcome some RPC related
/// problems. Shall be removed in some point in future.
pub fn with_legacy_backend(mut self, use_legacy_backend: bool) -> Self {
self.use_legacy_backend = use_legacy_backend;
self
}
/// Returns a set of tasks that handle transaction execution.
async fn build_transactions<H, T, S, B>(
&self,
builder: B,
sink: S,
tip: u128,
payload_builder: B::PayloadBuilder,
) -> Vec<DefaultTxTask<T>>
where
H: BlockHash + 'static,
T: Transaction<HashType = H> + Send + 'static,
S: TransactionsSink<H> + 'static + Clone,
B: TransactionBuilder<HashType = H, Transaction = T, Sink = S> + Send + Sync + 'static,
B::PayloadBuilder: Clone,
{
let mut tx_build_params = vec![];
if let Some(start_id) = self.start_id {
let last_id = self.last_id.unwrap_or(start_id);
for account in start_id..=last_id {
let mut nonce = self.nonce_from;
for _ in 0..self.txs_count {
tx_build_params.push(TransactionBuildParams {
account: account.to_string(),
nonce,
mortality: self.mortality,
});
nonce = nonce.map(|n| n + 1);
}
}
} else {
let mut nonce = self.nonce_from;
let account = self
.account_id
.clone()
.expect("to have configured an account id for transactions generation");
for _ in 0..self.txs_count {
tx_build_params.push(TransactionBuildParams {
account: account.clone(),
nonce,
mortality: self.mortality,
});
nonce = nonce.map(|n| n + 1);
}
}
let n = tx_build_params.len();
let t = std::cmp::min(
n,
std::thread::available_parallelism().unwrap_or(1usize.try_into().unwrap()).get(),
);
let tx_build_params = Arc::<Vec<TransactionBuildParams>>::from(tx_build_params);
let builder = Arc::new(builder);
let payload_builder = Arc::new(payload_builder);
let mut threads = Vec::new();
(0..t).for_each(|thread_idx| {
let chunk = ((thread_idx * n) / t)..(((thread_idx + 1) * n) / t);
let tx_build_params = tx_build_params.clone();
let builder = builder.clone();
let sink = sink.clone();
let payload_builder = payload_builder.clone();
let watched_txs = self.watched_txs;
threads.push(tokio::task::spawn(async move {
let mut txs = vec![];
for i in chunk {
let build_params = tx_build_params[i].clone();
txs.push(
builder
.build_transaction(
watched_txs,
BuildTransactionParams {
account: &build_params.account,
nonce: &build_params.nonce,
mortality: &build_params.mortality,
tip,
},
&sink,
&*payload_builder,
)
.await,
);
}
txs
}));
});
let mut results = vec![];
for handle in threads {
let result = handle.await.unwrap();
results.push(result);
}
let mut txs: Vec<_> = results.into_iter().flatten().collect();
txs.sort_by_key(|k| k.tx().nonce());
txs
}
/// Returns a runner of transactions for the configured scenario.
pub async fn build(mut self) -> ScenarioExecutor {
let does_block_monitoring = self.does_block_monitoring;
let send_threshold =
self.send_threshold.expect("to have configured the send threshold. qed.");
let rpc_uri = self.rpc_uri.clone().expect("to have configured the rpc uri. qed.");
let chain_type = self.chain_type.clone().expect("to have a configured chain type. qed");
let accounts_description = if let Some(start_id) = self.start_id {
let last_id = self.last_id.unwrap_or(start_id);
AccountsDescription::Derived(start_id..last_id + 1)
} else if let Some(account_description) = self
.account_id
.clone()
.and_then(|id| id.parse::<u32>().ok())
.map(|id| AccountsDescription::Derived(id..id + 1))
{
account_description
} else {
AccountsDescription::Keyring(
self.account_id
.clone()
.expect("to have configured an account id for transactions generation"),
)
};
let installs_ctrlc_stop_hook = self.installs_ctrl_c_stop_hook;
let tip = self.tip;
match chain_type {
ChainType::Eth => {
let payload_builder = self
.tx_payload_builder_source
.take()
.expect("No payload source configured")
.into_eth_builder();
let builder = EthTransactionBuilder::default();
let new_with_uri_with_accounts_description =
EthTransactionsSink::new_with_uri_with_accounts_description(
rpc_uri.as_str(),
accounts_description,
generate_ecdsa_keypair,
if does_block_monitoring {
Some(BlockMonitor::new(rpc_uri.as_str()).await)
} else {
None
},
self.use_legacy_backend,
);
let sink = new_with_uri_with_accounts_description.await;
let txs =
self.build_transactions(builder, sink.clone(), tip, payload_builder).await;
let (stop_sender, runner) =
Runner::<DefaultTxTask<EthTransaction>, EthTransactionsSink>::new(
send_threshold,
sink,
txs.into_iter().rev().collect(),
self.log_file_name_prefix,
self.base_dir_path,
self.executor_id,
self.timeout,
);
let executor = ScenarioExecutor::Eth(EthScenarioExecutor::new(stop_sender, runner));
installs_ctrlc_stop_hook.then(|| executor.install_ctrlc_stop_hook());
executor
},
ChainType::Sub => {
let payload_builder = self
.tx_payload_builder_source
.take()
.expect("No payload source configured")
.into_sub_builder();
let builder = SubstrateTransactionBuilder::default();
let sink = SubstrateTransactionsSink::new_with_uri_with_accounts_description(
rpc_uri.as_str(),
accounts_description,
generate_sr25519_keypair,
if does_block_monitoring {
Some(BlockMonitor::new(rpc_uri.as_str()).await)
} else {
None
},
self.use_legacy_backend,
)
.await;
let txs =
self.build_transactions(builder, sink.clone(), tip, payload_builder).await;
let (stop_sender, runner) =
Runner::<DefaultTxTask<SubstrateTransaction>, SubstrateTransactionsSink>::new(
send_threshold,
sink,
txs.into_iter().rev().collect(),
self.log_file_name_prefix,
self.base_dir_path,
self.executor_id,
self.timeout,
);
let executor = ScenarioExecutor::Substrate(SubstrateScenarioExecutor::new(
stop_sender,
runner,
));
installs_ctrlc_stop_hook.then(|| executor.install_ctrlc_stop_hook());
executor
},
ChainType::Fake => unimplemented!(),
}
}
}
#[cfg(test)]
mod tests {
use crate::{
fake_transaction_sink::FakeTransactionsSink,
scenario::ScenarioBuilder,
transaction::{AccountMetadata, FakeTransactionBuilder},
};
use crate::{runner::TxTask, transaction::Transaction};
#[tokio::test]
async fn build_tx_tasks_based_on_scenario_type() {
// One shot from derived account based on number id.
let sink = FakeTransactionsSink::default();
let builder = FakeTransactionBuilder;
let scenario_builder = ScenarioBuilder::new().with_start_id(0).with_nonce_from(Some(0));
let tasks = scenario_builder.build_transactions(builder, sink, 0, ()).await;
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].tx().nonce(), 0);
assert_eq!(tasks[0].tx().account_metadata(), AccountMetadata::Derived(0));
// One shot from derived account.
let sink = FakeTransactionsSink::default();
let builder = FakeTransactionBuilder;
let scenario_builder = ScenarioBuilder::new()
.with_account_id("alice".to_string())
.with_nonce_from(Some(0));
let tasks = scenario_builder.build_transactions(builder, sink, 0, ()).await;
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].tx().nonce(), 0);
assert_eq!(tasks[0].tx().account_metadata(), AccountMetadata::KeyRing("alice".to_string()));
// Build from single derived account based on number id.
let sink = FakeTransactionsSink::default();
let builder = FakeTransactionBuilder;
let scenario_builder = ScenarioBuilder::new()
.with_start_id(1)
.with_nonce_from(Some(0))
.with_txs_count(10);
let tasks = scenario_builder.build_transactions(builder, sink, 0, ()).await;
assert_eq!(tasks.len(), 10);
for (i, task) in tasks.iter().enumerate() {
assert_eq!(task.tx().nonce(), i as u128);
assert_eq!(task.tx().account_metadata(), AccountMetadata::Derived(1));
}
// Buld from single account keyring.
let sink = FakeTransactionsSink::default();
let builder = FakeTransactionBuilder;
let scenario_builder = ScenarioBuilder::new()
.with_account_id("alice".to_string())
.with_nonce_from(Some(0))
.with_txs_count(10);
let tasks = scenario_builder.build_transactions(builder, sink, 0, ()).await;
assert_eq!(tasks.len(), 10);
for (i, task) in tasks.iter().enumerate() {
assert_eq!(task.tx().nonce(), i as u128);
assert_eq!(task.tx().account_metadata(), AccountMetadata::KeyRing("alice".to_string()));
}
// Buld from many derived accounts based on number ids.
let sink = FakeTransactionsSink::default();
let builder = FakeTransactionBuilder;
let scenario_builder = ScenarioBuilder::new()
.with_start_id(5)
.with_last_id(10)
.with_nonce_from(Some(0))
.with_txs_count(10);
let tasks = scenario_builder.build_transactions(builder, sink, 0, ()).await;
assert_eq!(tasks.len(), 60);
for (i, task) in tasks.iter().enumerate() {
assert_eq!(task.tx().nonce(), i as u128 / 6);
assert_eq!(task.tx().account_metadata(), AccountMetadata::Derived((i as u32 % 6) + 5));
}
}
}
@@ -0,0 +1,55 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use std::{error::Error, sync::Arc, time::Duration};
use subxt::OnlineClient;
use tracing::info;
use crate::helpers;
const MAX_ATTEMPTS: usize = 10;
/// Delay period between failed connection attempts.
const RETRY_DELAY: Duration = Duration::from_secs(1);
/// Connect to a RPC node.
pub(crate) async fn connect<C: subxt::Config>(
url: &str,
use_legacy_backend: bool,
) -> Result<OnlineClient<C>, Box<dyn Error>> {
for i in 0..MAX_ATTEMPTS {
info!("Attempt #{}: Connecting to {}", i, url);
let maybe_client = if use_legacy_backend {
let backend = subxt::backend::legacy::LegacyBackend::builder()
.build(subxt::backend::rpc::RpcClient::new(helpers::client(url).await?));
OnlineClient::from_backend(Arc::new(backend)).await
} else {
let backend = subxt::backend::chain_head::ChainHeadBackend::builder()
.transaction_timeout(6 * 3600)
//note: This required new subxt release
// subxt 0.42.1:
// .submit_transactions_ignoring_follow_events()
.build_with_background_driver(subxt::backend::rpc::RpcClient::new(
helpers::client(url).await?,
));
OnlineClient::from_backend(Arc::new(backend)).await
};
// let maybe_client = OnlineClient::<EthRuntimeConfig>::from_rpc_client(client);
match maybe_client {
Ok(client) => {
info!("Connection established to: {}", url);
return Ok(client);
},
Err(err) => {
info!("API client {} error: {:?}", url, err);
tokio::time::sleep(RETRY_DELAY).await;
},
};
}
let err = format!("Failed to connect to {url} after {MAX_ATTEMPTS} attempts");
info!("{err}");
Err(err.into())
}
@@ -0,0 +1,763 @@
use crate::{
block_monitor::BlockMonitor,
error::Error,
helpers::StreamOf,
scenario::AccountsDescription,
transaction::{
AccountMetadata, Transaction, TransactionMonitor, TransactionStatus, TransactionsSink,
},
};
use async_trait::async_trait;
use futures::StreamExt;
use parking_lot::RwLock;
use std::{
any::Any,
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
pub use subxt::dynamic;
use subxt::{
backend::rpc::RpcClient,
config::{
transaction_extensions::{
ChargeAssetTxPaymentParams, ChargeTransactionPaymentParams, CheckMortalityParams,
CheckNonceParams,
},
DefaultExtrinsicParams, ExtrinsicParams,
},
dynamic::{At, Value},
ext::scale_value::value,
tx::{DynamicPayload, PartialTransaction, Signer, SubmittableTransaction},
OnlineClient, PolkadotConfig,
};
use subxt_core::{config::SubstrateExtrinsicParamsBuilder, utils::AccountId20};
use subxt_signer::{
eth::{dev as eth_dev, Keypair as EthKeypair, Signature},
sr25519::{dev as sr25519_dev, Keypair as SrPair},
};
use tracing::{debug, error, trace};
const LOG_TARGET: &str = "subxt_tx";
const DEFAULT_RETRIES_FOR_PARTIAL_TX_CREATION: usize = 10;
#[derive(Clone)]
/// Ethereum runtime config definition for subxt usage purposes.
pub enum EthRuntimeConfig {}
impl subxt::Config for EthRuntimeConfig {
type AccountId = AccountId20;
type Address = AccountId20;
type Signature = Signature;
type Hasher = subxt::config::substrate::BlakeTwo256;
type Header =
subxt::config::substrate::SubstrateHeader<u32, subxt::config::substrate::BlakeTwo256>;
type ExtrinsicParams = subxt::config::SubstrateExtrinsicParams<Self>;
type AssetId = u32;
}
/// Type alias for subxt config hash (Output of Hasher).
pub(crate) type HashOf<C> = <<C as subxt::Config>::Hasher as subxt::config::Hasher>::Output;
/// Type alias for subxt account id.
pub(crate) type AccountIdOf<C> = <C as subxt::Config>::AccountId;
/// A subxt transaction abstraction.
#[derive(Clone)]
pub struct SubxtTransaction<C: subxt::Config> {
transaction: Arc<SubmittableTransaction<C, OnlineClient<C>>>,
nonce: u128,
valid_until: Option<u64>,
account_metadata: AccountMetadata,
}
/// Transaction type thart runs on `Ethereum` compatible chains.
pub type EthTransaction = SubxtTransaction<EthRuntimeConfig>;
/// Holds the RPC API connection for transaction execution.
pub type EthTransactionsSink = SubxtTransactionsSink<EthRuntimeConfig, EthKeypair>;
/// Transaction type that runs on `substrate` compatible chains.
pub type SubstrateTransaction = SubxtTransaction<PolkadotConfig>;
/// Holds the RPC API connection for transaction execution.
pub type SubstrateTransactionsSink = SubxtTransactionsSink<PolkadotConfig, SrPair>;
/// Context for building transaction payloads.
/// Generic over account type `A` to support both Substrate and Ethereum chains.
pub struct TxPayloadBuildContext<'a, A> {
/// The destination account ID.
pub to_account_id: &'a A,
/// The source account ID (signer).
pub from_account_id: &'a A,
/// Account string identifier (e.g. "0", "alice").
pub account: &'a str,
/// Transaction nonce.
pub nonce: u128,
}
/// Context type alias for Substrate chains.
pub type SubTxBuildContext<'a> = TxPayloadBuildContext<'a, AccountIdOf<PolkadotConfig>>;
/// Context type alias for Ethereum chains.
pub type EthTxBuildContext<'a> = TxPayloadBuildContext<'a, AccountId20>;
/// Generic payload builder function type.
pub type PayloadBuilderFn<A> =
Arc<dyn Fn(&TxPayloadBuildContext<A>) -> DynamicPayload + Send + Sync>;
/// Payload builder type alias for Substrate chains.
pub type SubPayloadBuilderFn = PayloadBuilderFn<AccountIdOf<PolkadotConfig>>;
/// Payload builder type alias for Ethereum chains.
pub type EthPayloadBuilderFn = PayloadBuilderFn<AccountId20>;
/// Creates a generic remark payload builder.
/// Works for any account type that implements `AsRef<[u8]>`.
/// Size is specified in kilobytes.
pub fn remark_payload_builder<A>(size_kb: u32) -> PayloadBuilderFn<A>
where
A: AsRef<[u8]> + Send + Sync + 'static,
{
Arc::new(move |ctx| {
let i = hex::encode(ctx.to_account_id.as_ref()).as_bytes().last().copied().unwrap();
let data = vec![i; size_kb as usize * 1024];
subxt::dynamic::tx("System", "remark", vec![data])
})
}
/// Creates a transfer payload builder for Substrate chains.
pub fn sub_transfer_payload_builder() -> SubPayloadBuilderFn {
Arc::new(|ctx| {
subxt::dynamic::tx(
"Balances",
"transfer_keep_alive",
vec![
value!(Id(Value::from_bytes(ctx.to_account_id.clone()))),
Value::u128(1u32.into()),
],
)
})
}
/// Creates a transfer payload builder for Ethereum chains.
pub fn eth_transfer_payload_builder() -> EthPayloadBuilderFn {
Arc::new(|ctx| {
subxt::dynamic::tx(
"Balances",
"transfer_keep_alive",
vec![
Value::unnamed_composite(vec![Value::from_bytes(*ctx.to_account_id)]),
Value::u128(1u32.into()),
],
)
})
}
impl<C: subxt::Config> SubxtTransaction<C> {
pub fn new(
transaction: SubmittableTransaction<C, OnlineClient<C>>,
nonce: u128,
valid_until: Option<u64>,
account_metadata: AccountMetadata,
) -> Self {
Self { transaction: Arc::new(transaction), nonce, account_metadata, valid_until }
}
}
// type TransactionSubxt2 = subxt::tx::DynamicPayload;
impl<C: subxt::Config> Transaction for SubxtTransaction<C> {
type HashType = HashOf<C>;
fn hash(&self) -> Self::HashType {
self.transaction.hash()
}
fn as_any(&self) -> &dyn Any {
self
}
fn nonce(&self) -> u128 {
self.nonce
}
fn account_metadata(&self) -> AccountMetadata {
self.account_metadata.clone()
}
fn valid_until(&self) -> &Option<u64> {
&self.valid_until
}
}
#[derive(Clone)]
pub struct SubxtTransactionsSink<C: subxt::Config, KP: Signer<C>> {
api: OnlineClient<C>,
from_accounts: Arc<RwLock<HashMap<String, (KP, AccountMetadata)>>>,
to_accounts: Arc<RwLock<HashMap<String, (KP, AccountMetadata)>>>,
nonces: Arc<RwLock<HashMap<String, u128>>>,
rpc_client: RpcClient,
current_pending_extrinsics: Arc<RwLock<Option<(Instant, usize)>>>,
block_monitor: Option<BlockMonitor<C>>,
}
const EXPECT_CONNECT: &str = "should connect to rpc client";
impl<C, KP> SubxtTransactionsSink<C, KP>
where
AccountIdOf<C>: Send + Sync + AsRef<[u8]>,
KP: Signer<C> + Clone + Send + Sync + 'static,
C: subxt::Config,
{
pub async fn new() -> Self {
Self {
api: crate::subxt_api_connector::connect("ws://127.0.0.1:9933", false)
.await
.expect(EXPECT_CONNECT),
from_accounts: Default::default(),
to_accounts: Default::default(),
nonces: Default::default(),
rpc_client: RpcClient::from_url("ws://127.0.0.1:9933").await.expect(EXPECT_CONNECT),
current_pending_extrinsics: Arc::new(None.into()),
block_monitor: None,
}
}
pub async fn new_with_uri(uri: &String) -> Self {
Self {
api: crate::subxt_api_connector::connect(uri, false).await.expect(EXPECT_CONNECT),
from_accounts: Default::default(),
to_accounts: Default::default(),
nonces: Default::default(),
rpc_client: RpcClient::from_url(uri).await.expect(EXPECT_CONNECT),
current_pending_extrinsics: Arc::new(None.into()),
block_monitor: None,
}
}
pub async fn new_with_uri_with_accounts_description<G>(
uri: &str,
accounts_description: AccountsDescription,
generate_pair: G,
block_monitor: Option<BlockMonitor<C>>,
use_legacy_backend: bool,
) -> Self
where
G: GenerateKeyPairFunction<KP>,
{
let from_accounts =
derive_accounts(accounts_description.clone(), SENDER_SEED, generate_pair);
let to_accounts = derive_accounts(accounts_description, RECEIVER_SEED, generate_pair);
Self {
api: crate::subxt_api_connector::connect(uri, use_legacy_backend)
.await
.expect(EXPECT_CONNECT),
from_accounts: Arc::from(RwLock::from(from_accounts)),
to_accounts: Arc::from(RwLock::from(to_accounts)),
nonces: Default::default(),
rpc_client: crate::helpers::client(uri).await.expect(EXPECT_CONNECT).into(),
current_pending_extrinsics: Arc::new(None.into()),
block_monitor,
}
}
fn api(&self) -> OnlineClient<C> {
self.api.clone()
}
pub fn get_from_account_id(&self, account: &str) -> Option<AccountIdOf<C>> {
self.from_accounts.read().get(account).map(|a| a.0.account_id())
}
fn get_to_account_id(&self, account: &str) -> Option<AccountIdOf<C>> {
self.to_accounts.read().get(account).map(|a| a.0.account_id())
}
fn get_to_account_metadata(&self, account: &str) -> Option<AccountMetadata> {
self.to_accounts.read().get(account).map(|a| a.1.clone())
}
fn get_from_key_pair(&self, account: &str) -> Option<KP> {
self.from_accounts.read().get(account).map(|k| k.0.clone())
}
pub async fn check_account_nonce(
&self,
account: AccountIdOf<C>,
) -> Result<u128, Box<dyn std::error::Error>> {
let is_nonce_set = {
let nonces = self.nonces.read();
nonces.get(&hex::encode(account.clone())).cloned()
};
let remote_nonce = if let Some(nonce) = is_nonce_set {
nonce
} else {
check_account_nonce(self.api.clone(), account.clone()).await?
};
let mut nonces = self.nonces.write();
if let Some(nonce) = nonces.get_mut(&hex::encode(account.clone())) {
*nonce += 1;
Ok(*nonce)
} else {
nonces.insert(hex::encode(account), remote_nonce);
Ok(remote_nonce)
}
}
async fn update_count(&self) {
let i = Instant::now();
let xts_len = self
.rpc_client
.request::<Vec<serde_json::Value>>(
"author_pendingExtrinsics",
subxt_rpcs::rpc_params!(),
)
.await
.expect("author_pendingExtrinsics should not fail")
.len();
*self.current_pending_extrinsics.write() = Some((i, xts_len));
}
}
/// Fetches an account storage and returns its nonce.
pub async fn check_account_nonce<C: subxt::Config>(
api: OnlineClient<C>,
account: AccountIdOf<C>,
) -> Result<u128, Box<dyn std::error::Error>>
where
AccountIdOf<C>: Send + Sync + AsRef<[u8]>,
{
let storage_query =
subxt::dynamic::storage("System", "Account", vec![Value::from_bytes(account.clone())]);
let result = api.storage().at_latest().await?.fetch(&storage_query).await?;
let value = result
.ok_or(format!("Sender account {:?} does not exist", hex::encode(account.clone())))?
.to_value()?;
debug!(target:LOG_TARGET,"account has free balance: {:?}", value.at("data").at("free"));
debug!(target:LOG_TARGET,"account has nonce: {:?}", value.at("nonce"));
// info!("account has nonce: {:#?}", value);
let nonce = value
.at("nonce")
.ok_or("nonce is not set for the account")?
.as_u128()
.ok_or("nonce is not u128")?;
Ok(nonce)
}
#[async_trait]
impl<C, KP> TransactionsSink<HashOf<C>> for SubxtTransactionsSink<C, KP>
where
AccountIdOf<C>: Send + Sync + AsRef<[u8]>,
C: subxt::Config,
KP: Signer<C> + Clone + Send + Sync + 'static,
{
async fn submit_and_watch(
&self,
tx: &dyn Transaction<HashType = HashOf<C>>,
) -> Result<StreamOf<TransactionStatus<HashOf<C>>>, Error> {
let tx = tx.as_any().downcast_ref::<SubxtTransaction<C>>().unwrap();
let result = tx.transaction.submit_and_watch().await;
match result {
Ok(stream) => Ok(stream.map(|e| e.unwrap().into()).boxed()),
Err(e) => Err(e.into()),
}
}
async fn submit(
&self,
tx: &dyn Transaction<HashType = HashOf<C>>,
) -> Result<HashOf<C>, Error> {
let tx = tx.as_any().downcast_ref::<SubxtTransaction<C>>().unwrap();
tx.transaction.submit().await.map_err(|e| e.into())
}
/// Current count of transactions being processed by sink.
async fn pending_extrinsics(&self) -> usize {
let current_pending_extrinsics = { *self.current_pending_extrinsics.read() };
if let Some((ts, _)) = current_pending_extrinsics {
if ts.elapsed() > Duration::from_millis(1000) {
self.update_count().await;
}
} else {
self.update_count().await;
}
self.current_pending_extrinsics
.read()
.expect("current_pending_extrinsics cannot be None")
.1
}
fn transaction_monitor(&self) -> Option<&dyn TransactionMonitor<HashOf<C>>> {
self.block_monitor
.as_ref()
.map(|m| m as &dyn TransactionMonitor<HashOf<C>>)
}
}
/// Types of accounts generation.
#[derive(Debug, Clone)]
pub enum AccountGenerateRequest {
Keyring(String),
Derived(String, u32),
}
/// Seed user for sender accounts.
pub const SENDER_SEED: &str = "//Sender";
/// Seed used for receiver accounts.
pub(crate) const RECEIVER_SEED: &str = "//Receiver";
/// Generates ecdsa based keypairs.
pub fn generate_ecdsa_keypair(description: AccountGenerateRequest) -> EthKeypair {
match description {
AccountGenerateRequest::Keyring(name) => match name.as_str() {
"alice" | "alith" => eth_dev::alith(),
"bob" | "baltathar" => eth_dev::baltathar(),
"charlie" | "charleth" => eth_dev::charleth(),
"dave" | "dorothy" => eth_dev::dorothy(),
"eve" | "ethan" => eth_dev::ethan(),
"ferdie" | "faith" => eth_dev::faith(),
_ => panic!("unknown keyring name"),
},
AccountGenerateRequest::Derived(seed, i) => {
use std::str::FromStr;
let derivation = format!("{seed}//{i}");
let u = subxt_signer::SecretUri::from_str(&derivation).unwrap();
<subxt_signer::ecdsa::Keypair>::from_uri(&u).unwrap().into()
},
}
}
/// Generates sr25519 based keypairs.
pub fn generate_sr25519_keypair(description: AccountGenerateRequest) -> SrPair {
match description {
AccountGenerateRequest::Keyring(name) => match name.as_str() {
"alice" | "alith" => sr25519_dev::alice(),
"bob" | "baltathar" => sr25519_dev::bob(),
"charlie" | "charleth" => sr25519_dev::charlie(),
"dave" | "dorothy" => sr25519_dev::dave(),
"eve" | "ethan" => sr25519_dev::eve(),
"ferdie" | "faith" => sr25519_dev::ferdie(),
_ => panic!("unknown keyring name"),
},
AccountGenerateRequest::Derived(seed, i) => {
use std::str::FromStr;
let derivation = format!("{seed}//{i}");
let u = subxt_signer::SecretUri::from_str(&derivation).unwrap();
<subxt_signer::sr25519::Keypair>::from_uri(&u).unwrap()
},
}
}
/// Interface for implementors of keypairs generators.
pub trait GenerateKeyPairFunction<KP>:
Fn(AccountGenerateRequest) -> KP + Copy + Send + 'static
{
}
impl<T, KP> GenerateKeyPairFunction<KP> for T where
T: Fn(AccountGenerateRequest) -> KP + Copy + Send + 'static
{
}
/// Logic that derives accounts from a certain seed.
pub fn derive_accounts<C, KP, G>(
accounts_description: AccountsDescription,
seed: &str,
generate: G,
) -> HashMap<String, (KP, AccountMetadata)>
where
C: subxt::Config,
KP: Signer<C> + Send + Sync + 'static,
G: GenerateKeyPairFunction<KP>,
{
match accounts_description {
AccountsDescription::Derived(range) => {
let from_id = range.start as usize;
let to_id = range.end as usize;
let n = to_id - from_id;
let t = std::cmp::min(
n,
std::thread::available_parallelism().unwrap_or(1usize.try_into().unwrap()).get(),
);
let mut threads = Vec::new();
(0..t).for_each(|thread_idx| {
// let chunk = (thread_idx * (n / t))..((thread_idx + 1) * (n / t));
let chunk =
(from_id + (thread_idx * n) / t)..(from_id + ((thread_idx + 1) * n) / t);
let seed = seed.to_string().clone();
threads.push(std::thread::spawn(move || {
chunk
.into_iter()
.map(move |i| {
(
i.to_string(),
(
generate(AccountGenerateRequest::Derived(
seed.to_string(),
i as u32,
)),
AccountMetadata::Derived(i as u32),
),
)
})
.collect::<Vec<_>>()
}));
});
threads
.into_iter()
.flat_map(|h| h.join().unwrap())
// .map(|p| (p, funds))
.collect()
},
AccountsDescription::Keyring(account) => HashMap::from([(
account.clone(),
(
generate(AccountGenerateRequest::Keyring(account.clone())),
AccountMetadata::KeyRing(account),
),
)]),
}
}
#[allow(clippy::too_many_arguments)]
async fn create_online_transaction<C: subxt::Config, KP, B>(
from_keypair: &KP,
nonce: u128,
mortality: &Option<u64>,
account: &str,
sink: &SubxtTransactionsSink<C, KP>,
from_account_id: &<C as subxt::Config>::AccountId,
to_account_id: &<C as subxt::Config>::AccountId,
tip: u128,
payload_builder: &B,
) -> Result<SubxtTransaction<C>, Error>
where
AccountIdOf<C>: Send + Sync + AsRef<[u8]>,
KP: Signer<C> + Clone + Send + Sync + 'static,
<<C as subxt::Config>::ExtrinsicParams as subxt::config::ExtrinsicParams<C>>::Params: From<(
(),
(),
(),
CheckNonceParams,
(),
CheckMortalityParams<C>,
ChargeAssetTxPaymentParams<C>,
ChargeTransactionPaymentParams,
(),
)>,
B: Fn(&TxPayloadBuildContext<AccountIdOf<C>>) -> DynamicPayload + ?Sized,
{
// Needed because `Params` as associated type does not implement clone, and we need to
// recreate the tx params in a loop when we can't create a partial tx with the online
// client, due to various RPC related issues or state not being up to date (currently we
// handle an error which happens when trying to create a partial tx that is based on a
// certain finalized block returned by the RPC, which is then reported as not found).
// Retrying seems to fix the issue.
fn tx_params<CC: subxt::Config>(
mortality: &Option<u64>,
nonce: u64,
tip: u128,
) -> <DefaultExtrinsicParams<CC> as ExtrinsicParams<CC>>::Params {
let mut params = <SubstrateExtrinsicParamsBuilder<CC>>::new().nonce(nonce).tip(tip);
if let Some(mortal) = mortality {
params = params.mortal(*mortal);
}
params.build()
}
// Creates a subxt transaction.
//
// The mortality of the transaction involves setting up a block until the transaction is valid,
// which needs fetching the last finalized block number on chain similarly to subxt:
// https://github.com/paritytech/subxt/blob/77b6abccbacf194f3889610024e2f4024e8c2822/subxt/src/tx/tx_client.rs#L600
async fn subxt_transaction<CC: subxt::Config, KEYP>(
sink: &SubxtTransactionsSink<CC, KEYP>,
mut partial_tx: PartialTransaction<CC, OnlineClient<CC>>,
from_keypair: &KEYP,
nonce: u128,
mortality: &Option<u64>,
account: &str,
) -> Result<SubxtTransaction<CC>, Error>
where
KEYP: Signer<CC> + Clone + Send + Sync + 'static,
AccountIdOf<CC>: Send + Sync + AsRef<[u8]>,
{
let block_number = if mortality.is_some() {
let block_ref = sink
.api()
.backend()
.latest_finalized_block_ref()
.await
.expect("to get the last finalized block ref. qed");
let block = sink
.api()
.blocks()
.at(block_ref)
.await
.expect("to get the corresponding block header. qed");
Some(block.number().into())
} else {
None
};
let submittable_tx = partial_tx.sign(from_keypair);
let hash = submittable_tx.hash();
debug!(target:LOG_TARGET,"built mortal tx hash: {:?}", hash);
Ok(SubxtTransaction::<CC>::new(
submittable_tx,
nonce,
mortality.and_then(|mortal| block_number.map(|number| number + mortal)),
sink.get_to_account_metadata(account).expect("account metadata exists"),
))
}
let ctx = TxPayloadBuildContext { to_account_id, from_account_id, account, nonce };
let tx_call = payload_builder(&ctx);
for _ in 0..DEFAULT_RETRIES_FOR_PARTIAL_TX_CREATION {
let params = tx_params(mortality, nonce as u64, tip);
match sink.api().tx().create_partial(&tx_call, from_account_id, params.into()).await {
Ok(tx) =>
return subxt_transaction(sink, tx, from_keypair, nonce, mortality, account).await,
Err(_) => continue,
}
}
error!(target: LOG_TARGET, "Attempting transaction creation with the online client, to factor in the provided mortality, failed.");
Err(Error::Other("failed to create transaction with online client".to_string()))
}
/// Builds a transaction with subxt.
pub(crate) async fn build_subxt_tx<C, KP, B>(
params: &crate::transaction::BuildTransactionParams<'_>,
sink: &SubxtTransactionsSink<C, KP>,
payload_builder: &B,
) -> SubxtTransaction<C>
where
AccountIdOf<C>: Send + Sync + AsRef<[u8]>,
C: subxt::Config,
KP: Signer<C> + Clone + Send + Sync + 'static,
<<C as subxt::Config>::ExtrinsicParams as subxt::config::ExtrinsicParams<C>>::Params: From<(
(),
(),
(),
CheckNonceParams,
(),
CheckMortalityParams<C>,
ChargeAssetTxPaymentParams<C>,
ChargeTransactionPaymentParams,
(),
)>,
B: Fn(&TxPayloadBuildContext<AccountIdOf<C>>) -> DynamicPayload + ?Sized,
{
let &crate::transaction::BuildTransactionParams { account, nonce, mortality, tip } = params;
let to_account_id = sink.get_to_account_id(account).expect("to account exists");
let from_account_id = sink.get_from_account_id(account).expect("from account exists");
let from_keypair = sink.get_from_key_pair(account).expect("from account exists");
let nonce = if let Some(nonce) = nonce {
trace!("nonce for {:?} -> {:?}", account, nonce);
*nonce
} else {
let nonce = sink
.check_account_nonce(from_account_id.clone())
.await
.expect("account nonce shall exists");
trace!("checked nonce for {:?} -> {:?}", account, nonce);
nonce
};
debug!(
target:LOG_TARGET,
account,
nonce,
?mortality,
from_account=hex::encode(from_account_id.clone()),
to_account=hex::encode(to_account_id.clone()),
"build_subxt_tx"
);
if mortality.is_some() {
create_online_transaction(
&from_keypair,
nonce,
mortality,
account,
sink,
&from_account_id,
&to_account_id,
tip,
payload_builder,
)
.await
.expect("failed to create mortal transaction")
} else {
let tx_params = <SubstrateExtrinsicParamsBuilder<C>>::new()
.nonce(nonce as u64)
.tip(tip)
.build()
.into();
let ctx = TxPayloadBuildContext {
to_account_id: &to_account_id,
from_account_id: &from_account_id,
account,
nonce,
};
let tx_call = payload_builder(&ctx);
let tx = SubxtTransaction::<C>::new(
sink.api()
.tx()
.create_partial_offline(&tx_call, tx_params)
.unwrap()
.sign(&from_keypair),
nonce as u128,
None,
sink.get_to_account_metadata(account).expect("account metadata exists"),
);
debug!(target:LOG_TARGET,"built immortal tx hash: {:?}", tx.hash());
tx
}
}
#[cfg(test)]
mod tests {
use subxt::SubstrateConfig;
use crate::{
subxt_transaction::{
derive_accounts, generate_sr25519_keypair, AccountGenerateRequest, SENDER_SEED,
},
transaction::AccountMetadata,
};
#[tokio::test]
async fn test_derive_accounts_len() {
let accounts = derive_accounts::<SubstrateConfig, subxt_signer::sr25519::Keypair, _>(
crate::scenario::AccountsDescription::Derived(0..11),
SENDER_SEED,
generate_sr25519_keypair,
);
assert_eq!(accounts.len(), 11);
for (i, (kp, meta)) in accounts {
let id = i.parse::<u32>().unwrap();
assert_eq!(
kp.public_key().0,
generate_sr25519_keypair(AccountGenerateRequest::Derived(
SENDER_SEED.to_string(),
id
))
.public_key()
.0
);
assert_eq!(AccountMetadata::Derived(id), meta);
}
let accounts = derive_accounts::<SubstrateConfig, subxt_signer::sr25519::Keypair, _>(
crate::scenario::AccountsDescription::Keyring("alice".to_string()),
SENDER_SEED,
generate_sr25519_keypair,
);
assert_eq!(accounts.len(), 1);
assert_eq!(
accounts.get("alice").unwrap().0.public_key().0,
generate_sr25519_keypair(AccountGenerateRequest::Keyring("alice".to_string()))
.public_key()
.0
);
assert_eq!(accounts.get("alice").unwrap().1, AccountMetadata::KeyRing("alice".to_string()))
}
}
@@ -0,0 +1,280 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.
use crate::{
error::Error,
fake_transaction::{FakeHash, FakeTransaction},
fake_transaction_sink::FakeTransactionsSink,
helpers::StreamOf,
runner::DefaultTxTask,
subxt_transaction::{
build_subxt_tx, EthPayloadBuilderFn, EthRuntimeConfig, EthTransaction, EthTransactionsSink,
HashOf, SubPayloadBuilderFn, SubstrateTransaction, SubstrateTransactionsSink,
},
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::any::Any;
use subxt::{tx::TxStatus, OnlineClient, PolkadotConfig};
use subxt_core::config::Hash as BlockHash;
/// Parameters for building a transaction.
pub(crate) struct BuildTransactionParams<'a> {
pub account: &'a str,
pub nonce: &'a Option<u128>,
pub mortality: &'a Option<u64>,
pub tip: u128,
}
/// Interface for transaction building.
#[async_trait]
pub(crate) trait TransactionBuilder {
type HashType: BlockHash;
type Transaction: Transaction<HashType = Self::HashType>;
type Sink: TransactionsSink<Self::HashType>;
type PayloadBuilder: Send + Sync;
async fn build_transaction<'a>(
&self,
watched: bool,
params: BuildTransactionParams<'a>,
sink: &Self::Sink,
payload_builder: &Self::PayloadBuilder,
) -> DefaultTxTask<Self::Transaction>;
}
/// Substrate transactions builder.
#[derive(Default)]
pub(crate) struct SubstrateTransactionBuilder {}
#[async_trait]
impl TransactionBuilder for SubstrateTransactionBuilder {
type HashType = HashOf<PolkadotConfig>;
type Transaction = SubstrateTransaction;
type Sink = SubstrateTransactionsSink;
type PayloadBuilder = SubPayloadBuilderFn;
async fn build_transaction<'a>(
&self,
watched: bool,
params: BuildTransactionParams<'a>,
sink: &Self::Sink,
payload_builder: &Self::PayloadBuilder,
) -> DefaultTxTask<Self::Transaction> {
let tx = build_subxt_tx(&params, sink, &**payload_builder).await;
if watched {
DefaultTxTask::<Self::Transaction>::new_watched(tx)
} else {
DefaultTxTask::<Self::Transaction>::new_unwatched(tx)
}
}
}
/// Ethereum transactions builder.
#[derive(Default)]
pub(crate) struct EthTransactionBuilder {}
#[async_trait]
impl TransactionBuilder for EthTransactionBuilder {
type HashType = HashOf<EthRuntimeConfig>;
type Transaction = EthTransaction;
type Sink = EthTransactionsSink;
type PayloadBuilder = EthPayloadBuilderFn;
async fn build_transaction<'a>(
&self,
watched: bool,
params: BuildTransactionParams<'a>,
sink: &Self::Sink,
payload_builder: &Self::PayloadBuilder,
) -> DefaultTxTask<Self::Transaction> {
let tx = build_subxt_tx(&params, sink, &**payload_builder).await;
if watched {
DefaultTxTask::<Self::Transaction>::new_watched(tx)
} else {
DefaultTxTask::<Self::Transaction>::new_unwatched(tx)
}
}
}
#[allow(dead_code)]
#[derive(Default)]
/// A transaction builder sink that's used as mock for logic relying on a transaction builder.
pub(crate) struct FakeTransactionBuilder;
#[async_trait]
impl TransactionBuilder for FakeTransactionBuilder {
type HashType = FakeHash;
type Transaction = FakeTransaction;
type Sink = FakeTransactionsSink;
type PayloadBuilder = ();
async fn build_transaction<'a>(
&self,
watched: bool,
params: BuildTransactionParams<'a>,
sink: &Self::Sink,
_payload_builder: &Self::PayloadBuilder,
) -> DefaultTxTask<Self::Transaction> {
if watched {
todo!()
};
let mut nonces = sink.nonces.write();
let nonce = if let Some(nonce) = nonces.get_mut(&hex::encode(params.account)) {
*nonce += 1;
*nonce
} else {
nonces.insert(hex::encode(params.account), 0);
0
};
let id = params.account.parse::<u32>().ok();
if let Some(i) = id {
DefaultTxTask::<FakeTransaction>::new_watched(FakeTransaction::new_multiple(
i,
nonce,
vec![],
))
} else {
DefaultTxTask::<FakeTransaction>::new_watched(FakeTransaction::new_with_keyring(
"alice".to_string(),
nonce,
vec![],
))
}
}
}
/// What account was used to sign transaction
#[derive(Default, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub enum AccountMetadata {
/// Holds index used for account derivation
#[default]
None,
Derived(u32),
KeyRing(String),
}
/// Type of transaction logic.
#[derive(Clone)]
pub enum TransactionCall {
Transfer,
Remark(u32),
}
#[derive(Clone)]
/// Type of transaction to execute.
pub struct TransactionRecipe {
pub(crate) call: TransactionCall,
}
impl TransactionRecipe {
pub fn transfer() -> Self {
Self { call: TransactionCall::Transfer }
}
pub fn remark(size: u32) -> Self {
Self { call: TransactionCall::Remark(size) }
}
}
/// Interface that asks for logic to decide if a transaction is done.
pub(crate) trait TransactionStatusIsDone {
fn is_terminal(&self) -> bool;
fn is_finalized(&self) -> bool;
fn is_error(&self) -> bool;
}
#[derive(Debug, PartialEq, Clone, Deserialize, Serialize)]
pub enum TransactionStatus<H> {
Validated,
Broadcasted,
InBlock(H),
NoLongerInBestBlock,
Finalized(H),
Dropped(String),
Invalid(String),
Error(String),
}
impl<H> TransactionStatus<H> {
pub(crate) fn get_letter(&self) -> char {
match self {
TransactionStatus::Validated => 'V',
TransactionStatus::Broadcasted => 'b',
TransactionStatus::InBlock(..) => 'B',
TransactionStatus::Finalized(..) => 'F',
TransactionStatus::Error { .. } => 'E',
TransactionStatus::Invalid { .. } => 'I',
TransactionStatus::Dropped { .. } => 'D',
TransactionStatus::NoLongerInBestBlock => 'N',
}
}
}
impl<H: BlockHash + std::fmt::Debug> TransactionStatus<H> {}
impl<C: subxt::Config> From<TxStatus<C, OnlineClient<C>>> for TransactionStatus<HashOf<C>> {
fn from(value: TxStatus<C, OnlineClient<C>>) -> Self {
match value {
TxStatus::Validated => TransactionStatus::Validated,
TxStatus::Broadcasted => TransactionStatus::Broadcasted,
TxStatus::InBestBlock(tx) => TransactionStatus::InBlock(tx.block_hash()),
TxStatus::InFinalizedBlock(tx) => TransactionStatus::Finalized(tx.block_hash()),
TxStatus::Error { message } => TransactionStatus::Error(message),
TxStatus::Invalid { message } => TransactionStatus::Invalid(message),
TxStatus::Dropped { message } => TransactionStatus::Dropped(message),
TxStatus::NoLongerInBestBlock => TransactionStatus::NoLongerInBestBlock,
}
}
}
impl<H: BlockHash> TransactionStatusIsDone for TransactionStatus<H> {
fn is_terminal(&self) -> bool {
matches!(self, Self::Finalized(_) | Self::Dropped(_) | Self::Invalid(_) | Self::Error(_))
}
fn is_finalized(&self) -> bool {
matches!(self, Self::Finalized(_))
}
fn is_error(&self) -> bool {
matches!(self, Self::Dropped(_) | Self::Invalid(_) | Self::Error(_))
}
}
/// Interface for a multi-chain transaction abstraction.
pub trait Transaction: Send + Sync {
type HashType: BlockHash + 'static;
fn hash(&self) -> Self::HashType;
fn as_any(&self) -> &dyn Any;
fn nonce(&self) -> u128;
fn valid_until(&self) -> &Option<u64>;
fn account_metadata(&self) -> AccountMetadata;
}
/// Interface for monitoring transaction state.
#[async_trait]
pub trait TransactionMonitor<H: BlockHash> {
/// Wait for the transaction to finalize.
///
/// An optional block number is given to be considered for waiting when needed.
async fn wait(&self, tx_hash: H, until: Option<u64>) -> Result<H, Error>;
}
/// Abstraction for RPC client
#[async_trait]
pub trait TransactionsSink<H: BlockHash>: Send + Sync {
async fn submit_and_watch(
&self,
tx: &dyn Transaction<HashType = H>,
) -> Result<StreamOf<TransactionStatus<H>>, Error>;
async fn submit(&self, tx: &dyn Transaction<HashType = H>) -> Result<H, Error>;
///Current count of transactions being processed by sink
async fn pending_extrinsics(&self) -> usize;
fn transaction_monitor(&self) -> Option<&dyn TransactionMonitor<H>>;
}
File diff suppressed because one or more lines are too long