mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 16:17:59 +00:00
Enable parallel key scraping (#1985)
closes #174 --------- Co-authored-by: Liam Aharon <liam.aharon@hotmail.com> Co-authored-by: Oliver Tale-Yazdi <oliver.tale-yazdi@parity.io>
This commit is contained in:
@@ -47,6 +47,7 @@ use std::{
|
||||
fs,
|
||||
ops::{Deref, DerefMut},
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use substrate_rpc_client::{rpc_params, BatchRequestBuilder, ChainApi, ClientT, StateApi};
|
||||
@@ -298,6 +299,7 @@ impl Default for SnapshotConfig {
|
||||
}
|
||||
|
||||
/// Builder for remote-externalities.
|
||||
#[derive(Clone)]
|
||||
pub struct Builder<B: BlockT> {
|
||||
/// Custom key-pairs to be injected into the final externalities. The *hashed* keys and values
|
||||
/// must be given.
|
||||
@@ -400,41 +402,134 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Get all the keys at `prefix` at `hash` using the paged, safe RPC methods.
|
||||
async fn rpc_get_keys_paged(
|
||||
/// Get keys with `prefix` at `block` in a parallel manner.
|
||||
async fn rpc_get_keys_parallel(
|
||||
&self,
|
||||
prefix: StorageKey,
|
||||
at: B::Hash,
|
||||
prefix: &StorageKey,
|
||||
block: B::Hash,
|
||||
parallel: usize,
|
||||
) -> Result<Vec<StorageKey>, &'static str> {
|
||||
let mut last_key: Option<StorageKey> = None;
|
||||
let mut all_keys: Vec<StorageKey> = vec![];
|
||||
let keys = loop {
|
||||
/// Divide the workload and return the start key of each chunks. Guaranteed to return a
|
||||
/// non-empty list.
|
||||
fn gen_start_keys(prefix: &StorageKey) -> Vec<StorageKey> {
|
||||
let mut prefix = prefix.as_ref().to_vec();
|
||||
let scale = 32usize.saturating_sub(prefix.len());
|
||||
|
||||
// no need to divide workload
|
||||
if scale < 9 {
|
||||
prefix.extend(vec![0; scale]);
|
||||
return vec![StorageKey(prefix)]
|
||||
}
|
||||
|
||||
let chunks = 16;
|
||||
let step = 0x10000 / chunks;
|
||||
let ext = scale - 2;
|
||||
|
||||
(0..chunks)
|
||||
.map(|i| {
|
||||
let mut key = prefix.clone();
|
||||
let start = i * step;
|
||||
key.extend(vec![(start >> 8) as u8, (start & 0xff) as u8]);
|
||||
key.extend(vec![0; ext]);
|
||||
StorageKey(key)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
let start_keys = gen_start_keys(&prefix);
|
||||
let start_keys: Vec<Option<&StorageKey>> = start_keys.iter().map(Some).collect();
|
||||
let mut end_keys: Vec<Option<&StorageKey>> = start_keys[1..].to_vec();
|
||||
end_keys.push(None);
|
||||
|
||||
// use a semaphore to limit max scraping tasks
|
||||
let parallel = Arc::new(tokio::sync::Semaphore::new(parallel));
|
||||
let builder = Arc::new(self.clone());
|
||||
let mut handles = vec![];
|
||||
|
||||
for (start_key, end_key) in start_keys.into_iter().zip(end_keys) {
|
||||
let permit = parallel
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("semaphore is not closed until the end of loop");
|
||||
|
||||
let builder = builder.clone();
|
||||
let prefix = prefix.clone();
|
||||
let start_key = start_key.cloned();
|
||||
let end_key = end_key.cloned();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let res = builder
|
||||
.rpc_get_keys_in_range(&prefix, block, start_key.as_ref(), end_key.as_ref())
|
||||
.await;
|
||||
drop(permit);
|
||||
res
|
||||
});
|
||||
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
parallel.close();
|
||||
|
||||
let keys = futures::future::join_all(handles)
|
||||
.await
|
||||
.into_iter()
|
||||
.filter_map(|res| match res {
|
||||
Ok(Ok(keys)) => Some(keys),
|
||||
_ => None,
|
||||
})
|
||||
.flatten()
|
||||
.collect::<Vec<StorageKey>>();
|
||||
|
||||
Ok(keys)
|
||||
}
|
||||
|
||||
/// Get all keys with `prefix` within the given range at `block`.
|
||||
/// Both `start_key` and `end_key` are optional if you want an open-ended range.
|
||||
async fn rpc_get_keys_in_range(
|
||||
&self,
|
||||
prefix: &StorageKey,
|
||||
block: B::Hash,
|
||||
start_key: Option<&StorageKey>,
|
||||
end_key: Option<&StorageKey>,
|
||||
) -> Result<Vec<StorageKey>, &'static str> {
|
||||
let mut last_key: Option<&StorageKey> = start_key;
|
||||
let mut keys: Vec<StorageKey> = vec![];
|
||||
|
||||
loop {
|
||||
// This loop can hit the node with very rapid requests, occasionally causing it to
|
||||
// error out in CI (https://github.com/paritytech/substrate/issues/14129), so we retry.
|
||||
let retry_strategy =
|
||||
FixedInterval::new(Self::KEYS_PAGE_RETRY_INTERVAL).take(Self::MAX_RETRIES);
|
||||
let get_page_closure =
|
||||
|| self.get_keys_single_page(Some(prefix.clone()), last_key.clone(), at);
|
||||
let page = Retry::spawn(retry_strategy, get_page_closure).await?;
|
||||
|| self.get_keys_single_page(Some(prefix.clone()), last_key.cloned(), block);
|
||||
let mut page = Retry::spawn(retry_strategy, get_page_closure).await?;
|
||||
|
||||
// avoid duplicated keys across workloads
|
||||
if let (Some(last), Some(end)) = (page.last(), end_key) {
|
||||
if last >= end {
|
||||
page.retain(|key| key < end);
|
||||
}
|
||||
}
|
||||
|
||||
let page_len = page.len();
|
||||
keys.extend(page);
|
||||
last_key = keys.last();
|
||||
|
||||
all_keys.extend(page);
|
||||
|
||||
// scraping out of range or no more matches,
|
||||
// we are done either way
|
||||
if page_len < Self::DEFAULT_KEY_DOWNLOAD_PAGE as usize {
|
||||
log::debug!(target: LOG_TARGET, "last page received: {}", page_len);
|
||||
break all_keys
|
||||
} else {
|
||||
let new_last_key =
|
||||
all_keys.last().expect("all_keys is populated; has .last(); qed");
|
||||
log::debug!(
|
||||
target: LOG_TARGET,
|
||||
"new total = {}, full page received: {}",
|
||||
all_keys.len(),
|
||||
HexDisplay::from(new_last_key)
|
||||
);
|
||||
last_key = Some(new_last_key.clone());
|
||||
};
|
||||
};
|
||||
break
|
||||
}
|
||||
|
||||
log::debug!(
|
||||
target: LOG_TARGET,
|
||||
"new total = {}, full page received: {}",
|
||||
keys.len(),
|
||||
HexDisplay::from(last_key.expect("full page received, cannot be None"))
|
||||
);
|
||||
}
|
||||
|
||||
Ok(keys)
|
||||
}
|
||||
@@ -529,7 +624,7 @@ where
|
||||
"Batch request failed ({}/{} retries). Error: {}",
|
||||
retries,
|
||||
Self::MAX_RETRIES,
|
||||
e.to_string()
|
||||
e
|
||||
);
|
||||
// after 2 subsequent failures something very wrong is happening. log a warning
|
||||
// and reset the batch size down to 1.
|
||||
@@ -590,7 +685,7 @@ where
|
||||
/// map them to values one by one.
|
||||
///
|
||||
/// This can work with public nodes. But, expect it to be darn slow.
|
||||
pub(crate) async fn rpc_get_pairs_paged(
|
||||
pub(crate) async fn rpc_get_pairs(
|
||||
&self,
|
||||
prefix: StorageKey,
|
||||
at: B::Hash,
|
||||
@@ -598,8 +693,10 @@ where
|
||||
) -> Result<Vec<KeyValue>, &'static str> {
|
||||
let start = Instant::now();
|
||||
let mut sp = Spinner::with_timer(Spinners::Dots, "Scraping keys...".into());
|
||||
// TODO We could start downloading when having collected the first batch of keys
|
||||
// https://github.com/paritytech/polkadot-sdk/issues/2494
|
||||
let keys = self
|
||||
.rpc_get_keys_paged(prefix.clone(), at)
|
||||
.rpc_get_keys_parallel(&prefix, at, Self::PARALLEL_REQUESTS)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect::<Vec<_>>();
|
||||
@@ -628,9 +725,9 @@ where
|
||||
.unwrap()
|
||||
.progress_chars("=>-"),
|
||||
);
|
||||
let payloads_chunked = payloads.chunks((&payloads.len() / Self::PARALLEL_REQUESTS).max(1));
|
||||
let payloads_chunked = payloads.chunks((payloads.len() / Self::PARALLEL_REQUESTS).max(1));
|
||||
let requests = payloads_chunked.map(|payload_chunk| {
|
||||
Self::get_storage_data_dynamic_batch_size(&client, payload_chunk.to_vec(), &bar)
|
||||
Self::get_storage_data_dynamic_batch_size(client, payload_chunk.to_vec(), &bar)
|
||||
});
|
||||
// Execute the requests and move the Result outside.
|
||||
let storage_data_result: Result<Vec<_>, _> =
|
||||
@@ -644,7 +741,7 @@ where
|
||||
},
|
||||
};
|
||||
bar.finish_with_message("✅ Downloaded key values");
|
||||
print!("\n");
|
||||
println!();
|
||||
|
||||
// Check if we got responses for all submitted requests.
|
||||
assert_eq!(keys.len(), storage_data.len());
|
||||
@@ -778,8 +875,9 @@ where
|
||||
pending_ext: &mut TestExternalities<HashingFor<B>>,
|
||||
) -> Result<ChildKeyValues, &'static str> {
|
||||
let child_roots = top_kv
|
||||
.into_iter()
|
||||
.filter_map(|(k, _)| is_default_child_storage_key(k.as_ref()).then(|| k.clone()))
|
||||
.iter()
|
||||
.filter(|(k, _)| is_default_child_storage_key(k.as_ref()))
|
||||
.map(|(k, _)| k.clone())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if child_roots.is_empty() {
|
||||
@@ -799,11 +897,10 @@ where
|
||||
let mut child_kv = vec![];
|
||||
for prefixed_top_key in child_roots {
|
||||
let child_keys =
|
||||
Self::rpc_child_get_keys(&client, &prefixed_top_key, StorageKey(vec![]), at)
|
||||
.await?;
|
||||
Self::rpc_child_get_keys(client, &prefixed_top_key, StorageKey(vec![]), at).await?;
|
||||
|
||||
let child_kv_inner =
|
||||
Self::rpc_child_get_storage_paged(&client, &prefixed_top_key, child_keys, at)
|
||||
Self::rpc_child_get_storage_paged(client, &prefixed_top_key, child_keys, at)
|
||||
.await?;
|
||||
|
||||
let prefixed_top_key = PrefixedStorageKey::new(prefixed_top_key.clone().0);
|
||||
@@ -846,7 +943,7 @@ where
|
||||
for prefix in &config.hashed_prefixes {
|
||||
let now = std::time::Instant::now();
|
||||
let additional_key_values =
|
||||
self.rpc_get_pairs_paged(StorageKey(prefix.to_vec()), at, pending_ext).await?;
|
||||
self.rpc_get_pairs(StorageKey(prefix.to_vec()), at, pending_ext).await?;
|
||||
let elapsed = now.elapsed();
|
||||
log::info!(
|
||||
target: LOG_TARGET,
|
||||
@@ -1110,7 +1207,7 @@ mod test_prelude {
|
||||
pub(crate) type Block = RawBlock<ExtrinsicWrapper<Hash>>;
|
||||
|
||||
pub(crate) fn init_logger() {
|
||||
let _ = sp_tracing::try_init_simple();
|
||||
sp_tracing::try_init_simple();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1440,4 +1537,26 @@ mod remote_tests {
|
||||
.unwrap()
|
||||
.execute_with(|| {});
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_fetch_in_parallel() {
|
||||
init_logger();
|
||||
|
||||
let uri = String::from("wss://kusama-bridge-hub-rpc.polkadot.io:443");
|
||||
let mut builder = Builder::<Block>::new()
|
||||
.mode(Mode::Online(OnlineConfig { transport: uri.into(), ..Default::default() }));
|
||||
builder.init_remote_client().await.unwrap();
|
||||
|
||||
let at = builder.as_online().at.unwrap();
|
||||
|
||||
let prefix = StorageKey(vec![13]);
|
||||
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
|
||||
let para = builder.rpc_get_keys_parallel(&prefix, at, 4).await.unwrap();
|
||||
assert_eq!(paged, para);
|
||||
|
||||
let prefix = StorageKey(vec![]);
|
||||
let paged = builder.rpc_get_keys_in_range(&prefix, at, None, None).await.unwrap();
|
||||
let para = builder.rpc_get_keys_parallel(&prefix, at, 8).await.unwrap();
|
||||
assert_eq!(paged, para);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user