Merge remote-tracking branch 'origin/master' into lenxv/light-client-testing

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
This commit is contained in:
Alexandru Vasile
2024-03-26 18:05:45 +02:00
109 changed files with 12570 additions and 5033 deletions
@@ -14,7 +14,9 @@ use subxt::{
FollowEvent, Initialized, MethodResponse, RuntimeEvent, RuntimeVersionEvent, StorageQuery,
StorageQueryType,
},
utils::AccountId32,
config::Hasher,
utils::{AccountId32, MultiAddress},
SubstrateConfig,
};
#[cfg(lightclient)]
@@ -36,7 +38,7 @@ async fn chainhead_unstable_follow() {
assert_eq!(
event,
FollowEvent::Initialized(Initialized {
finalized_block_hash,
finalized_block_hashes: vec![finalized_block_hash],
finalized_block_runtime: None,
})
);
@@ -51,7 +53,7 @@ async fn chainhead_unstable_follow() {
assert_matches!(
event,
FollowEvent::Initialized(init) => {
assert_eq!(init.finalized_block_hash, finalized_block_hash);
assert_eq!(init.finalized_block_hashes, vec![finalized_block_hash]);
if let Some(RuntimeEvent::Valid(RuntimeVersionEvent { spec })) = init.finalized_block_runtime {
assert_eq!(spec.spec_version, runtime_version.spec_version);
assert_eq!(spec.transaction_version, runtime_version.transaction_version);
@@ -70,7 +72,7 @@ async fn chainhead_unstable_body() {
let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
@@ -99,7 +101,7 @@ async fn chainhead_unstable_header() {
let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
@@ -127,7 +129,7 @@ async fn chainhead_unstable_storage() {
let mut blocks = rpc.chainhead_unstable_follow(false).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
@@ -172,7 +174,7 @@ async fn chainhead_unstable_call() {
let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
@@ -209,7 +211,7 @@ async fn chainhead_unstable_unpin() {
let mut blocks = rpc.chainhead_unstable_follow(true).await.unwrap();
let event = blocks.next().await.unwrap().unwrap();
let hash = match event {
FollowEvent::Initialized(init) => init.finalized_block_hash,
FollowEvent::Initialized(init) => init.finalized_block_hashes.last().unwrap().clone(),
_ => panic!("Unexpected event"),
};
let sub_id = blocks.subscription_id().unwrap();
@@ -269,7 +271,7 @@ async fn transaction_unstable_submit_and_watch() {
let tx_bytes = ctx
.client()
.tx()
.create_signed_with_nonce(&payload, &dev::alice(), 0, Default::default())
.create_signed_offline(&payload, &dev::alice(), Default::default())
.unwrap()
.into_encoded();
@@ -317,3 +319,108 @@ async fn next_operation_event<
panic!("Cannot find operation related event after {NUM_EVENTS} produced events");
}
#[tokio::test]
async fn transaction_unstable_broadcast() {
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();
let ctx = test_context().await;
let api = ctx.client();
let rpc = ctx.unstable_rpc_methods().await;
let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address.clone(), 10_001);
let tx_bytes = ctx
.client()
.tx()
.create_signed_offline(&tx, &dev::alice(), Default::default())
.unwrap()
.into_encoded();
let tx_hash = <SubstrateConfig as subxt::Config>::Hasher::hash(&tx_bytes[2..]);
// Subscribe to finalized blocks.
let mut finalized_sub = api.blocks().subscribe_finalized().await.unwrap();
// Expect the tx to be encountered in a maximum number of blocks.
let mut num_blocks: usize = 10;
// Submit the transaction.
let _operation_id = rpc
.transaction_unstable_broadcast(&tx_bytes)
.await
.unwrap()
.expect("Server is not overloaded by 1 tx; qed");
while let Some(finalized) = finalized_sub.next().await {
let finalized = finalized.unwrap();
// Started with positive, should not overflow.
num_blocks = num_blocks.saturating_sub(1);
if num_blocks == 0 {
panic!("Did not find the tx in due time");
}
let extrinsics = finalized.extrinsics().await.unwrap();
let block_extrinsics = extrinsics
.iter()
.map(|res| res.unwrap())
.collect::<Vec<_>>();
let Some(ext) = block_extrinsics
.iter()
.find(|ext| <SubstrateConfig as subxt::Config>::Hasher::hash(ext.bytes()) == tx_hash)
else {
continue;
};
let ext = ext
.as_extrinsic::<node_runtime::balances::calls::types::TransferAllowDeath>()
.unwrap()
.unwrap();
assert_eq!(ext.value, 10_001);
return;
}
}
#[tokio::test]
async fn transaction_unstable_stop() {
let bob = dev::bob();
let bob_address: MultiAddress<AccountId32, u32> = bob.public_key().into();
let ctx = test_context().await;
let rpc = ctx.unstable_rpc_methods().await;
// Cannot stop an operation that was not started.
let _err = rpc
.transaction_unstable_stop("non-existent-operation-id")
.await
.unwrap_err();
// Submit a transaction and stop it.
let tx = node_runtime::tx()
.balances()
.transfer_allow_death(bob_address.clone(), 10_001);
let tx_bytes = ctx
.client()
.tx()
.create_signed_offline(&tx, &dev::alice(), Default::default())
.unwrap()
.into_encoded();
// Submit the transaction.
let operation_id = rpc
.transaction_unstable_broadcast(&tx_bytes)
.await
.unwrap()
.expect("Server is not overloaded by 1 tx; qed");
let _ = rpc.transaction_unstable_stop(&operation_id).await.unwrap();
// Cannot stop it twice.
let _err = rpc
.transaction_unstable_stop(&operation_id)
.await
.unwrap_err();
}
File diff suppressed because one or more lines are too long
@@ -60,10 +60,7 @@ async fn storage_map_lookup() -> Result<(), subxt::Error> {
Ok(())
}
// This fails until the fix in https://github.com/paritytech/subxt/pull/458 is introduced.
// Here we create a key that looks a bit like a StorageNMap key, but should in fact be
// treated as a StorageKey (ie we should hash both values together with one hasher, rather
// than hash both values separately, or ignore the second value).
#[cfg(fullclient)]
#[subxt_test]
async fn storage_n_mapish_key_is_properly_created() -> Result<(), subxt::Error> {
@@ -78,18 +75,21 @@ async fn storage_n_mapish_key_is_properly_created() -> Result<(), subxt::Error>
.session()
.key_owner(KeyTypeId([1, 2, 3, 4]), [5u8, 6, 7, 8]);
let actual_key_bytes = api.storage().address_bytes(&actual_key)?;
// Let's manually hash to what we assume it should be and compare:
let expected_key_bytes = {
// Hash the prefix to the storage entry:
let mut bytes = sp_core::twox_128("Session".as_bytes()).to_vec();
bytes.extend(&sp_core::twox_128("KeyOwner".as_bytes())[..]);
// twox64_concat a *tuple* of the args expected:
let suffix = (KeyTypeId([1, 2, 3, 4]), vec![5u8, 6, 7, 8]).encode();
bytes.extend(sp_core::twox_64(&suffix));
bytes.extend(&suffix);
// Both keys, use twox64_concat hashers:
let key1 = KeyTypeId([1, 2, 3, 4]).encode();
let key2 = vec![5u8, 6, 7, 8].encode();
bytes.extend(sp_core::twox_64(&key1));
bytes.extend(&key1);
bytes.extend(sp_core::twox_64(&key2));
bytes.extend(&key2);
bytes
};
dbg!(&expected_key_bytes);
assert_eq!(actual_key_bytes, expected_key_bytes);
Ok(())
@@ -174,9 +174,9 @@ async fn storage_partial_lookup() -> Result<(), subxt::Error> {
let addr_bytes = api.storage().address_bytes(&addr)?;
let mut results = api.storage().at_latest().await?.iter(addr).await?;
let mut approvals = Vec::new();
while let Some(Ok((key, value))) = results.next().await {
assert!(key.starts_with(&addr_bytes));
approvals.push(value);
while let Some(Ok(kv)) = results.next().await {
assert!(kv.key_bytes.starts_with(&addr_bytes));
approvals.push(kv.value);
}
assert_eq!(approvals.len(), assets.len());
let mut amounts = approvals.iter().map(|a| a.amount).collect::<Vec<_>>();
@@ -195,9 +195,10 @@ async fn storage_partial_lookup() -> Result<(), subxt::Error> {
let mut results = api.storage().at_latest().await?.iter(addr).await?;
let mut approvals = Vec::new();
while let Some(Ok((key, value))) = results.next().await {
assert!(key.starts_with(&addr_bytes));
approvals.push(value);
while let Some(Ok(kv)) = results.next().await {
assert!(kv.key_bytes.starts_with(&addr_bytes));
assert!(kv.keys.decoded().is_ok());
approvals.push(kv.value);
}
assert_eq!(approvals.len(), 1);
assert_eq!(approvals[0].amount, amount);
@@ -29,13 +29,10 @@
use crate::utils::node_runtime;
use codec::Compact;
use subxt::{
client::{LightClient, LightClientBuilder, OnlineClientT},
config::PolkadotConfig,
};
use subxt::{client::OnlineClient, config::PolkadotConfig, lightclient::LightClient};
use subxt_metadata::Metadata;
type Client = LightClient<PolkadotConfig>;
type Client = OnlineClient<PolkadotConfig>;
// Check that we can subscribe to non-finalized blocks.
async fn non_finalized_headers_subscription(api: &Client) -> Result<(), subxt::Error> {
@@ -119,9 +116,11 @@ async fn dynamic_events(api: &Client) -> Result<(), subxt::Error> {
#[tokio::test]
async fn light_client_testing() -> Result<(), subxt::Error> {
let api: LightClient<PolkadotConfig> = LightClientBuilder::new()
.build_from_url("wss://rpc.polkadot.io:443")
.await?;
let chainspec = subxt::utils::fetch_chainspec_from_rpc_node("wss://rpc.polkadot.io:443")
.await
.unwrap();
let (_lc, rpc) = LightClient::relay_chain(chainspec.get())?;
let api = Client::from_rpc_client(rpc).await?;
non_finalized_headers_subscription(&api).await?;
finalized_headers_subscription(&api).await?;
@@ -11,9 +11,6 @@ use subxt::{
Config, OnlineClient,
};
#[cfg(lightclient)]
use subxt::client::{LightClient, LightClientBuilder};
/// Spawn a local substrate node for testing subxt.
pub struct TestNodeProcess<R: Config> {
// Keep a handle to the node; once it's dropped the node is killed.
@@ -24,12 +21,7 @@ pub struct TestNodeProcess<R: Config> {
legacy_client: RefCell<Option<OnlineClient<R>>>,
rpc_client: rpc::RpcClient,
#[cfg(fullclient)]
client: OnlineClient<R>,
#[cfg(lightclient)]
client: LightClient<R>,
}
impl<R> TestNodeProcess<R>
@@ -92,16 +84,9 @@ where
/// will use the legacy backend by default or the unstable backend if the
/// "unstable-backend-client" feature is enabled, so that we can run each
/// test against both.
#[cfg(fullclient)]
pub fn client(&self) -> OnlineClient<R> {
self.client.clone()
}
/// Returns the subxt client connected to the running node.
#[cfg(lightclient)]
pub fn client(&self) -> LightClient<R> {
self.client.clone()
}
}
/// Construct a test node process.
@@ -198,7 +183,7 @@ async fn build_rpc_client(ws_url: &str) -> Result<rpc::RpcClient, String> {
async fn build_legacy_client<T: Config>(
rpc_client: rpc::RpcClient,
) -> Result<OnlineClient<T>, String> {
let backend = legacy::LegacyBackend::new(rpc_client);
let backend = legacy::LegacyBackend::builder().build(rpc_client);
let client = OnlineClient::from_backend(Arc::new(backend))
.await
.map_err(|e| format!("Cannot construct OnlineClient from backend: {e}"))?;
@@ -232,49 +217,46 @@ async fn build_unstable_client<T: Config>(
}
#[cfg(lightclient)]
async fn build_light_client<T: Config>(proc: &SubstrateNode) -> Result<LightClient<T>, String> {
async fn build_light_client<T: Config>(proc: &SubstrateNode) -> Result<OnlineClient<T>, String> {
use subxt::lightclient::{ChainConfig, LightClient};
// RPC endpoint.
let ws_url = format!("ws://127.0.0.1:{}", proc.ws_port());
// Step 1. Wait for a few blocks to be produced using the subxt client.
// Wait for a few blocks to be produced using the subxt client.
let client = OnlineClient::<T>::from_url(ws_url.clone())
.await
.map_err(|err| format!("Failed to connect to node rpc at {ws_url}: {err}"))?;
// Wait for at least 3 blocks before starting the light client.
// Wait for at least 3 blocks before starting the light client.
// Otherwise, the lightclient might error with
// `"Error when retrieving the call proof: No node available for call proof query"`.
super::wait_for_number_of_blocks(&client, 4).await;
// Step 2. Construct the light client.
// P2p bootnode.
// Now, configure a light client; fetch the chain spec and modify the bootnodes.
let bootnode = format!(
"/ip4/127.0.0.1/tcp/{}/p2p/{}",
proc.p2p_port(),
proc.p2p_address()
);
let mut result = LightClientBuilder::new()
.bootnodes([bootnode.as_str()])
.build_from_url(ws_url.as_str())
let chain_spec = subxt::utils::fetch_chainspec_from_rpc_node(ws_url.as_str())
.await
.map_err(|e| format!("Failed to construct light client {}", e));
.map_err(|e| format!("Failed to obtain chain spec from local machine: {e}"))?;
for _ in 0..3 {
if let Err(e) = &result {
if e.contains("Error when retrieving the call proof") {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let chain_config = ChainConfig::chain_spec(chain_spec.get())
.set_bootnodes([bootnode.as_str()])
.map_err(|e| format!("Light client: cannot update boot nodes: {e}"))?;
result = LightClientBuilder::new()
.bootnodes([bootnode.as_str()])
.build_from_url(ws_url.as_str())
.await
.map_err(|e| format!("Failed to construct light client {}", e));
}
} else {
break;
}
}
// Instantiate the light client.
let (_lightclient, rpc) = LightClient::relay_chain(chain_config)
.map_err(|e| format!("Light client: cannot add relay chain: {e}"))?;
result
// Instantiate subxt client from this.
let api = OnlineClient::from_rpc_client(rpc)
.await
.map_err(|e| format!("Failed to build OnlineClient from light client RPC: {e}"))?;
Ok(api)
}