Add tests for legacy Backend impl (#1751)

---------

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
This commit is contained in:
Pavlo Khrystenko
2024-09-06 11:26:22 +02:00
committed by GitHub
parent 398d2a8605
commit b8735e5782
4 changed files with 430 additions and 17 deletions
+1 -1
View File
@@ -126,7 +126,7 @@ wasm-bindgen-futures = { workspace = true, optional = true }
bitvec = { workspace = true }
codec = { workspace = true, features = ["derive", "bit-vec"] }
scale-info = { workspace = true, features = ["bit-vec"] }
tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "time", "rt-multi-thread", "sync"] }
sp-core = { workspace = true }
sp-keyring = { workspace = true }
sp-runtime = { workspace = true }
+24 -16
View File
@@ -95,28 +95,36 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
keys: Vec<Vec<u8>>,
at: T::Hash,
) -> Result<StreamOfResults<StorageResponse>, Error> {
retry(|| async {
let keys = keys.clone();
let methods = self.methods.clone();
// For each key, return it + a future to get the result.
let iter = keys.into_iter().map(move |key| {
fn get_entry<T: Config>(
key: Vec<u8>,
at: T::Hash,
methods: LegacyRpcMethods<T>,
) -> impl Future<Output = Result<Option<StorageResponse>, Error>> {
retry(move || {
let methods = methods.clone();
let key = key.clone();
async move {
let res = methods.state_get_storage(&key, Some(at)).await?;
Ok(res.map(|value| StorageResponse { key, value }))
Ok(res.map(move |value| StorageResponse { key, value }))
}
});
})
}
let s = stream::iter(iter)
// Resolve the future
.then(|fut| fut)
// Filter any Options out (ie if we didn't find a value at some key we return nothing for it).
.filter_map(|r| future::ready(r.transpose()));
let keys = keys.clone();
let methods = self.methods.clone();
Ok(StreamOf(Box::pin(s)))
})
.await
// For each key, return it + a future to get the result.
let iter = keys
.into_iter()
.map(move |key| get_entry(key, at, methods.clone()));
let s = stream::iter(iter)
// Resolve the future
.then(|fut| fut)
// Filter any Options out (ie if we didn't find a value at some key we return nothing for it).
.filter_map(|r| future::ready(r.transpose()));
Ok(StreamOf(Box::pin(s)))
}
async fn storage_fetch_descendant_keys(
+1
View File
@@ -455,6 +455,7 @@ pub type EncodedJustification = Vec<u8>;
/// the RPC call `state_getRuntimeVersion`,
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
#[cfg_attr(test, derive(serde::Serialize))]
pub struct RuntimeVersion {
/// Version of the runtime specification. A full-node will not attempt to use its native
/// runtime in substitute for the on-chain Wasm runtime unless all of `spec_name`,
+404
View File
@@ -325,9 +325,413 @@ pub enum TransactionStatus<Hash> {
/// A response from calls like [`Backend::storage_fetch_values`] or
/// [`Backend::storage_fetch_descendant_values`].
#[cfg_attr(test, derive(serde::Serialize, Clone, PartialEq, Debug))]
pub struct StorageResponse {
/// The key.
pub key: Vec<u8>,
/// The associated value.
pub value: Vec<u8>,
}
#[cfg(test)]
mod test {
use super::*;
mod legacy {
use super::rpc::{RpcClient, RpcClientT};
use crate::backend::rpc::RawRpcSubscription;
use crate::backend::BackendExt;
use crate::{
backend::{
legacy::rpc_methods::Bytes, legacy::rpc_methods::RuntimeVersion,
legacy::LegacyBackend, StorageResponse,
},
error::RpcError,
};
use futures::StreamExt;
use serde::Serialize;
use serde_json::value::RawValue;
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use subxt_core::{config::DefaultExtrinsicParams, Config};
use tokio::sync::{mpsc, Mutex};
type RpcResult<T> = Result<T, RpcError>;
type Item = RpcResult<String>;
struct MockDataTable {
items: HashMap<Vec<u8>, VecDeque<Item>>,
}
impl MockDataTable {
fn new() -> Self {
MockDataTable {
items: HashMap::new(),
}
}
fn from_iter<'a, T: Serialize, I: IntoIterator<Item = (&'a str, RpcResult<T>)>>(
item: I,
) -> Self {
let mut data = Self::new();
for (key, item) in item.into_iter() {
data.push(key.into(), item);
}
data
}
fn push<I: Serialize>(&mut self, key: Vec<u8>, item: RpcResult<I>) {
let item = item.map(|x| serde_json::to_string(&x).unwrap());
match self.items.entry(key) {
std::collections::hash_map::Entry::Occupied(v) => v.into_mut().push_back(item),
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(VecDeque::from([item]));
}
}
}
fn pop(&mut self, key: Vec<u8>) -> Item {
self.items.get_mut(&key).unwrap().pop_front().unwrap()
}
}
struct Subscription {
sender: mpsc::Sender<RpcResult<Vec<Item>>>,
receiver: mpsc::Receiver<RpcResult<Vec<Item>>>,
}
impl Subscription {
fn new() -> Self {
let (sender, receiver) = mpsc::channel(32);
Self { sender, receiver }
}
async fn from_iter<
T: Serialize,
S: IntoIterator<Item = RpcResult<Vec<RpcResult<T>>>>,
>(
items: S,
) -> Self {
let sub = Self::new();
for i in items {
let i: RpcResult<Vec<Item>> = i.map(|items| {
items
.into_iter()
.map(|item| item.map(|i| serde_json::to_string(&i).unwrap()))
.collect()
});
sub.write(i).await
}
sub
}
async fn read(&mut self) -> RpcResult<Vec<Item>> {
self.receiver.recv().await.unwrap()
}
async fn write(&self, items: RpcResult<Vec<Item>>) {
self.sender.send(items).await.unwrap()
}
}
struct Data {
request: MockDataTable,
subscription: Subscription,
}
struct MockRpcClientStorage {
data: Arc<Mutex<Data>>,
}
impl RpcClientT for MockRpcClientStorage {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<serde_json::value::RawValue>>,
) -> super::rpc::RawRpcFuture<'a, Box<serde_json::value::RawValue>> {
Box::pin(async move {
match method {
"state_getStorage" => {
let mut data = self.data.lock().await;
let params = params.map(|p| p.get().to_string());
let rpc_params = jsonrpsee::types::Params::new(params.as_deref());
let key: sp_core::Bytes = rpc_params.sequence().next().unwrap();
let value = data.request.pop(key.0);
value.map(|v| serde_json::value::RawValue::from_string(v).unwrap())
}
"chain_getBlockHash" => {
let mut data = self.data.lock().await;
let value = data.request.pop("chain_getBlockHash".into());
value.map(|v| serde_json::value::RawValue::from_string(v).unwrap())
}
_ => todo!(),
}
})
}
fn subscribe_raw<'a>(
&'a self,
_sub: &'a str,
_params: Option<Box<serde_json::value::RawValue>>,
_unsub: &'a str,
) -> super::rpc::RawRpcFuture<'a, super::rpc::RawRpcSubscription> {
Box::pin(async {
let mut data = self.data.lock().await;
let values: RpcResult<Vec<RpcResult<Box<RawValue>>>> =
data.subscription.read().await.map(|v| {
v.into_iter()
.map(|v| {
v.map(|v| serde_json::value::RawValue::from_string(v).unwrap())
})
.collect::<Vec<RpcResult<Box<RawValue>>>>()
});
values.map(|v| RawRpcSubscription {
stream: futures::stream::iter(v).boxed(),
id: Some("ID".to_string()),
})
})
}
}
// Define dummy config
enum Conf {}
impl Config for Conf {
type Hash = crate::utils::H256;
type AccountId = crate::utils::AccountId32;
type Address = crate::utils::MultiAddress<Self::AccountId, ()>;
type Signature = crate::utils::MultiSignature;
type Hasher = crate::config::substrate::BlakeTwo256;
type Header = crate::config::substrate::SubstrateHeader<u32, Self::Hasher>;
type ExtrinsicParams = DefaultExtrinsicParams<Self>;
type AssetId = u32;
}
use crate::backend::Backend;
fn client_runtime_version(num: u32) -> crate::client::RuntimeVersion {
crate::client::RuntimeVersion {
spec_version: num,
transaction_version: num,
}
}
fn runtime_version(num: u32) -> RuntimeVersion {
RuntimeVersion {
spec_version: num,
transaction_version: num,
other: HashMap::new(),
}
}
fn bytes(str: &str) -> RpcResult<Option<Bytes>> {
Ok(Some(Bytes(str.into())))
}
fn storage_response<K: Into<Vec<u8>>, V: Into<Vec<u8>>>(key: K, value: V) -> StorageResponse
where
Vec<u8>: From<K>,
{
StorageResponse {
key: key.into(),
value: value.into(),
}
}
async fn build_mock_client<
'a,
T: Serialize,
D: IntoIterator<Item = (&'a str, RpcResult<T>)>,
S: IntoIterator<Item = RpcResult<Vec<RpcResult<T>>>>,
>(
table_data: D,
subscription_data: S,
) -> RpcClient {
let data = Data {
request: MockDataTable::from_iter(table_data),
subscription: Subscription::from_iter(subscription_data).await,
};
RpcClient::new(MockRpcClientStorage {
data: Arc::new(Mutex::new(data)),
})
}
#[tokio::test]
async fn storage_fetch_values() {
let mock_data = vec![
("ID1", bytes("Data1")),
(
"ID2",
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("ID2", bytes("Data2")),
(
"ID3",
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("ID3", bytes("Data3")),
];
let rpc_client = build_mock_client(mock_data, vec![]).await;
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
// Test
let response = backend
.storage_fetch_values(
["ID1".into(), "ID2".into(), "ID3".into()].into(),
crate::utils::H256::random(),
)
.await
.unwrap();
let response = response
.map(|x| x.unwrap())
.collect::<Vec<StorageResponse>>()
.await;
let expected = vec![
storage_response("ID1", "Data1"),
storage_response("ID2", "Data2"),
storage_response("ID3", "Data3"),
];
assert_eq!(expected, response)
}
#[tokio::test]
async fn storage_fetch_value() {
// Setup
let mock_data = [
(
"ID1",
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("ID1", bytes("Data1")),
];
let rpc_client = build_mock_client(mock_data, vec![]).await;
// Test
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let response = backend
.storage_fetch_value("ID1".into(), crate::utils::H256::random())
.await
.unwrap();
let response = response.unwrap();
assert_eq!("Data1".to_owned(), String::from_utf8(response).unwrap())
}
#[tokio::test]
/// This test should cover the logic of the following methods:
/// - `genesis_hash`
/// - `block_header`
/// - `block_body`
/// - `latest_finalized_block`
/// - `current_runtime_version`
/// - `current_runtime_version`
/// - `call`
/// The test covers them because they follow the simple pattern of:
/// ```no_run
/// async fn THE_THING(&self) -> Result<T::Hash, Error> {
/// retry(|| <DO THE THING> ).await
/// }
/// ```
async fn simple_fetch() {
let hash = crate::utils::H256::random();
let mock_data = vec![
(
"chain_getBlockHash",
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
),
("chain_getBlockHash", Ok(Some(hash))),
];
let rpc_client = build_mock_client(mock_data, vec![]).await;
// Test
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let response = backend.genesis_hash().await.unwrap();
assert_eq!(hash, response)
}
#[tokio::test]
/// This test should cover the logic of the following methods:
/// - `stream_runtime_version`
/// - `stream_all_block_headers`
/// - `stream_best_block_headers`
/// The test covers them because they follow the simple pattern of:
/// ```no_run
/// async fn stream_the_thing(
/// &self,
/// ) -> Result<StreamOfResults<(T::Header, BlockRef<T::Hash>)>, Error> {
/// let methods = self.methods.clone();
/// let retry_sub = retry_stream(move || {
/// let methods = methods.clone();
/// Box::pin(async move {
/// methods.do_the_thing().await?
/// });
/// Ok(StreamOf(Box::pin(sub)))
/// })
/// })
/// .await?;
/// Ok(retry_sub)
/// }
/// ```
async fn stream_simple() {
let mock_subscription_data = vec![
Ok(vec![
Ok(runtime_version(0)),
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
Ok(runtime_version(1)),
]),
Ok(vec![
Err(RpcError::DisconnectedWillReconnect(
"Reconnecting".to_string(),
)),
Ok(runtime_version(2)),
Ok(runtime_version(3)),
]),
Ok(vec![
Ok(runtime_version(4)),
Ok(runtime_version(5)),
Err(RpcError::RequestRejected("Reconnecting".to_string())),
]),
];
let rpc_client = build_mock_client(vec![], mock_subscription_data).await;
// Test
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let mut results = backend.stream_runtime_version().await.unwrap();
let mut expected = VecDeque::from(vec![
Ok::<crate::client::RuntimeVersion, crate::Error>(client_runtime_version(0)),
Ok(client_runtime_version(4)),
Ok(client_runtime_version(5)),
]);
while let Some(res) = results.next().await {
if res.is_ok() {
assert_eq!(expected.pop_front().unwrap().unwrap(), res.unwrap())
} else {
assert!(matches!(
res,
Err(crate::Error::Rpc(RpcError::RequestRejected(_)))
))
}
}
assert!(expected.is_empty());
assert!(results.next().await.is_none())
}
}
}