mirror of
https://github.com/pezkuwichain/pezkuwi-subxt.git
synced 2026-04-27 15:07:59 +00:00
Make chain && state RPCs async (#3480)
* chain+state RPCs are async now * wrapped too long lines * create full/light RPC impls from service * use ordering * post-merge fix
This commit is contained in:
committed by
Gavin Wood
parent
816e132cd7
commit
607ee0a4e4
@@ -10,7 +10,8 @@ fnv = { version = "1.0", optional = true }
|
||||
log = { version = "0.4", optional = true }
|
||||
parking_lot = { version = "0.9.0", optional = true }
|
||||
hex = { package = "hex-literal", version = "0.2", optional = true }
|
||||
futures-preview = { version = "=0.3.0-alpha.17", optional = true }
|
||||
futures = { version = "0.1", optional = true }
|
||||
futures03 = { package = "futures-preview", version = "=0.3.0-alpha.17", features = ["compat"], optional = true }
|
||||
consensus = { package = "substrate-consensus-common", path = "../consensus/common", optional = true }
|
||||
executor = { package = "substrate-executor", path = "../executor", optional = true }
|
||||
state-machine = { package = "substrate-state-machine", path = "../state-machine", optional = true }
|
||||
@@ -49,7 +50,8 @@ std = [
|
||||
"fnv",
|
||||
"log",
|
||||
"hex",
|
||||
"futures-preview",
|
||||
"futures",
|
||||
"futures03",
|
||||
"executor",
|
||||
"state-machine",
|
||||
"keyring",
|
||||
|
||||
@@ -16,8 +16,10 @@
|
||||
|
||||
//! Substrate Client data backend
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use crate::error;
|
||||
use crate::light::blockchain::RemoteBlockchain;
|
||||
use primitives::ChangesTrieConfiguration;
|
||||
use sr_primitives::{generic::BlockId, Justification, StorageOverlay, ChildrenStorageOverlay};
|
||||
use sr_primitives::traits::{Block as BlockT, NumberFor};
|
||||
@@ -303,4 +305,7 @@ where
|
||||
{
|
||||
/// Returns true if the state for given block is available locally.
|
||||
fn is_local_state_available(&self, block: &BlockId<Block>) -> bool;
|
||||
/// Returns reference to blockchain backend that either resolves blockchain data
|
||||
/// locally, or prepares request to fetch that data from remote node.
|
||||
fn remote_blockchain(&self) -> Arc<dyn RemoteBlockchain<Block>>;
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::{
|
||||
panic::UnwindSafe, result, cell::RefCell, rc::Rc,
|
||||
};
|
||||
use log::{info, trace, warn};
|
||||
use futures::channel::mpsc;
|
||||
use futures03::channel::mpsc;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use codec::{Encode, Decode};
|
||||
use hash_db::{Hasher, Prefix};
|
||||
|
||||
@@ -718,6 +718,10 @@ where
|
||||
.map(|num| num.is_zero())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn remote_blockchain(&self) -> Arc<dyn crate::light::blockchain::RemoteBlockchain<Block>> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunable in-memory changes trie storage.
|
||||
|
||||
@@ -231,8 +231,8 @@ impl<S, F, Block, H> ClientBackend<Block, H> for Backend<S, F, H> where
|
||||
impl<S, F, Block, H> RemoteBackend<Block, H> for Backend<S, F, H>
|
||||
where
|
||||
Block: BlockT,
|
||||
S: BlockchainStorage<Block>,
|
||||
F: Fetcher<Block>,
|
||||
S: BlockchainStorage<Block> + 'static,
|
||||
F: Fetcher<Block> + 'static,
|
||||
H: Hasher<Out=Block::Hash>,
|
||||
H::Out: Ord,
|
||||
{
|
||||
@@ -242,6 +242,10 @@ where
|
||||
.map(|num| num.is_zero())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn remote_blockchain(&self) -> Arc<dyn crate::light::blockchain::RemoteBlockchain<Block>> {
|
||||
self.blockchain.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, F, Block, H> BlockImportOperation<Block, H> for ImportOperation<Block, S, F, H>
|
||||
@@ -358,7 +362,7 @@ where
|
||||
*self.cached_header.write() = Some(cached_header);
|
||||
}
|
||||
|
||||
futures::executor::block_on(
|
||||
futures03::executor::block_on(
|
||||
self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
|
||||
.remote_read(RemoteReadRequest {
|
||||
block: self.block,
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
//! Light client blockchain backend. Only stores headers and justifications of recent
|
||||
//! blocks. CHT roots are stored for headers of ancient blocks.
|
||||
|
||||
use std::future::Future;
|
||||
use std::{sync::{Weak, Arc}, collections::HashMap};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
@@ -72,6 +73,27 @@ pub trait Storage<Block: BlockT>: AuxStore + BlockchainHeaderBackend<Block> {
|
||||
fn cache(&self) -> Option<Arc<dyn BlockchainCache<Block>>>;
|
||||
}
|
||||
|
||||
/// Remote header.
|
||||
#[derive(Debug)]
|
||||
pub enum LocalOrRemote<Data, Request> {
|
||||
/// When data is available locally, it is returned.
|
||||
Local(Data),
|
||||
/// When data is unavailable locally, the request to fetch it from remote node is returned.
|
||||
Remote(Request),
|
||||
/// When data is unknown.
|
||||
Unknown,
|
||||
}
|
||||
|
||||
/// Futures-based blockchain backend that either resolves blockchain data
|
||||
/// locally, or fetches required data from remote node.
|
||||
pub trait RemoteBlockchain<Block: BlockT>: Send + Sync {
|
||||
/// Get block header.
|
||||
fn header(&self, id: BlockId<Block>) -> ClientResult<LocalOrRemote<
|
||||
Block::Header,
|
||||
RemoteHeaderRequest<Block::Header>,
|
||||
>>;
|
||||
}
|
||||
|
||||
/// Light client blockchain.
|
||||
pub struct Blockchain<S, F> {
|
||||
fetcher: Mutex<Weak<F>>,
|
||||
@@ -105,32 +127,10 @@ impl<S, F> Blockchain<S, F> {
|
||||
|
||||
impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
|
||||
fn header(&self, id: BlockId<Block>) -> ClientResult<Option<Block::Header>> {
|
||||
match self.storage.header(id)? {
|
||||
Some(header) => Ok(Some(header)),
|
||||
None => {
|
||||
let number = match id {
|
||||
BlockId::Hash(hash) => match self.storage.number(hash)? {
|
||||
Some(number) => number,
|
||||
None => return Ok(None),
|
||||
},
|
||||
BlockId::Number(number) => number,
|
||||
};
|
||||
|
||||
// if the header is from future or genesis (we never prune genesis) => return
|
||||
if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
futures::executor::block_on(
|
||||
self.fetcher().upgrade()
|
||||
.ok_or(ClientError::NotAvailableOnLightClient)?
|
||||
.remote_header(RemoteHeaderRequest {
|
||||
cht_root: self.storage.header_cht_root(cht::size(), number)?,
|
||||
block: number,
|
||||
retry_count: None,
|
||||
})
|
||||
).map(Some)
|
||||
}
|
||||
match RemoteBlockchain::header(self, id)? {
|
||||
LocalOrRemote::Local(header) => Ok(Some(header)),
|
||||
LocalOrRemote::Remote(_) => Err(ClientError::NotAvailableOnLightClient),
|
||||
LocalOrRemote::Unknown => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,12 +153,12 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
|
||||
|
||||
impl<S, F, Block> BlockchainBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
|
||||
fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
|
||||
let header = match self.header(id)? {
|
||||
let header = match BlockchainHeaderBackend::header(self, id)? {
|
||||
Some(header) => header,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
futures::executor::block_on(
|
||||
futures03::executor::block_on(
|
||||
self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
|
||||
.remote_body(RemoteBodyRequest {
|
||||
header,
|
||||
@@ -194,6 +194,62 @@ impl<S: Storage<Block>, F, Block: BlockT> ProvideCache<Block> for Blockchain<S,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, F, Block: BlockT> RemoteBlockchain<Block> for Blockchain<S, F>
|
||||
where
|
||||
S: Storage<Block>,
|
||||
F: Fetcher<Block> + Send + Sync,
|
||||
{
|
||||
fn header(&self, id: BlockId<Block>) -> ClientResult<LocalOrRemote<
|
||||
Block::Header,
|
||||
RemoteHeaderRequest<Block::Header>,
|
||||
>> {
|
||||
// first, try to read header from local storage
|
||||
if let Some(local_header) = self.storage.header(id)? {
|
||||
return Ok(LocalOrRemote::Local(local_header));
|
||||
}
|
||||
|
||||
// we need to know block number to check if it's a part of CHT
|
||||
let number = match id {
|
||||
BlockId::Hash(hash) => match self.storage.number(hash)? {
|
||||
Some(number) => number,
|
||||
None => return Ok(LocalOrRemote::Unknown),
|
||||
},
|
||||
BlockId::Number(number) => number,
|
||||
};
|
||||
|
||||
// if the header is genesis (never pruned), non-canonical, or from future => return
|
||||
if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown {
|
||||
return Ok(LocalOrRemote::Unknown);
|
||||
}
|
||||
|
||||
Ok(LocalOrRemote::Remote(RemoteHeaderRequest {
|
||||
cht_root: self.storage.header_cht_root(cht::size(), number)?,
|
||||
block: number,
|
||||
retry_count: None,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns future that resolves header either locally, or remotely.
|
||||
pub fn future_header<Block: BlockT, F: Fetcher<Block>>(
|
||||
blockchain: &dyn RemoteBlockchain<Block>,
|
||||
fetcher: &F,
|
||||
id: BlockId<Block>,
|
||||
) -> impl Future<Output = Result<Option<Block::Header>, ClientError>> {
|
||||
use futures03::future::{ready, Either, FutureExt};
|
||||
|
||||
match blockchain.header(id) {
|
||||
Ok(LocalOrRemote::Remote(request)) => Either::Left(
|
||||
fetcher
|
||||
.remote_header(request)
|
||||
.then(|header| ready(header.map(Some)))
|
||||
),
|
||||
Ok(LocalOrRemote::Unknown) => Either::Right(ready(Ok(None))),
|
||||
Ok(LocalOrRemote::Local(local_header)) => Either::Right(ready(Ok(Some(local_header)))),
|
||||
Err(err) => Either::Right(ready(Err(err))),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -99,7 +99,7 @@ where
|
||||
let block_hash = self.blockchain.expect_block_hash_from_id(id)?;
|
||||
let block_header = self.blockchain.expect_header(id.clone())?;
|
||||
|
||||
futures::executor::block_on(self.fetcher.remote_call(RemoteCallRequest {
|
||||
futures03::executor::block_on(self.fetcher.remote_call(RemoteCallRequest {
|
||||
block: block_hash,
|
||||
header: block_header,
|
||||
method: method.into(),
|
||||
|
||||
@@ -144,15 +144,15 @@ pub struct RemoteBodyRequest<Header: HeaderT> {
|
||||
/// is correct (see FetchedDataChecker) and return already checked data.
|
||||
pub trait Fetcher<Block: BlockT>: Send + Sync {
|
||||
/// Remote header future.
|
||||
type RemoteHeaderResult: Future<Output = Result<Block::Header, ClientError>>;
|
||||
type RemoteHeaderResult: Future<Output = Result<Block::Header, ClientError>> + Send + 'static;
|
||||
/// Remote storage read future.
|
||||
type RemoteReadResult: Future<Output = Result<Option<Vec<u8>>, ClientError>>;
|
||||
type RemoteReadResult: Future<Output = Result<Option<Vec<u8>>, ClientError>> + Send + 'static;
|
||||
/// Remote call result future.
|
||||
type RemoteCallResult: Future<Output = Result<Vec<u8>, ClientError>>;
|
||||
type RemoteCallResult: Future<Output = Result<Vec<u8>, ClientError>> + Send + 'static;
|
||||
/// Remote changes result future.
|
||||
type RemoteChangesResult: Future<Output = Result<Vec<(NumberFor<Block>, u32)>, ClientError>>;
|
||||
type RemoteChangesResult: Future<Output = Result<Vec<(NumberFor<Block>, u32)>, ClientError>> + Send + 'static;
|
||||
/// Remote block body result future.
|
||||
type RemoteBodyResult: Future<Output = Result<Vec<Block::Extrinsic>, ClientError>>;
|
||||
type RemoteBodyResult: Future<Output = Result<Vec<Block::Extrinsic>, ClientError>> + Send + 'static;
|
||||
|
||||
/// Fetch remote header.
|
||||
fn remote_header(&self, request: RemoteHeaderRequest<Block::Header>) -> Self::RemoteHeaderResult;
|
||||
@@ -493,7 +493,7 @@ impl<'a, H, Number, Hash> ChangesTrieRootsStorage<H, Number> for RootsStorage<'a
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
use futures::future::Ready;
|
||||
use futures03::future::Ready;
|
||||
use parking_lot::Mutex;
|
||||
use codec::Decode;
|
||||
use crate::client::tests::prepare_client_with_key_changes;
|
||||
@@ -521,7 +521,7 @@ pub mod tests {
|
||||
where
|
||||
E: std::convert::From<&'static str>,
|
||||
{
|
||||
futures::future::ready(Err("Not implemented on test node".into()))
|
||||
futures03::future::ready(Err("Not implemented on test node".into()))
|
||||
}
|
||||
|
||||
impl Fetcher<Block> for OkCallFetcher {
|
||||
@@ -544,7 +544,7 @@ pub mod tests {
|
||||
}
|
||||
|
||||
fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
|
||||
futures::future::ready(Ok((*self.lock()).clone()))
|
||||
futures03::future::ready(Ok((*self.lock()).clone()))
|
||||
}
|
||||
|
||||
fn remote_changes(&self, _request: RemoteChangesRequest<Header>) -> Self::RemoteChangesResult {
|
||||
|
||||
@@ -67,8 +67,8 @@ pub fn new_light<B, S, F, GS, RA, E>(
|
||||
>, B, RA>>
|
||||
where
|
||||
B: BlockT<Hash=H256>,
|
||||
S: BlockchainStorage<B>,
|
||||
F: Fetcher<B>,
|
||||
S: BlockchainStorage<B> + 'static,
|
||||
F: Fetcher<B> + 'static,
|
||||
GS: BuildStorage,
|
||||
E: CodeExecutor<Blake2Hasher> + RuntimeInfo,
|
||||
{
|
||||
|
||||
@@ -22,7 +22,7 @@ use std::{
|
||||
};
|
||||
|
||||
use fnv::{FnvHashSet, FnvHashMap};
|
||||
use futures::channel::mpsc;
|
||||
use futures03::channel::mpsc;
|
||||
use primitives::storage::{StorageKey, StorageData};
|
||||
use sr_primitives::traits::Block as BlockT;
|
||||
|
||||
@@ -347,7 +347,7 @@ mod tests {
|
||||
// given
|
||||
let mut notifications = StorageNotifications::<Block>::default();
|
||||
let child_filter = [(StorageKey(vec![4]), None)];
|
||||
let mut recv = futures::executor::block_on_stream(
|
||||
let mut recv = futures03::executor::block_on_stream(
|
||||
notifications.listen(None, Some(&child_filter[..]))
|
||||
);
|
||||
|
||||
@@ -382,13 +382,13 @@ mod tests {
|
||||
// given
|
||||
let mut notifications = StorageNotifications::<Block>::default();
|
||||
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
|
||||
let mut recv1 = futures::executor::block_on_stream(
|
||||
let mut recv1 = futures03::executor::block_on_stream(
|
||||
notifications.listen(Some(&[StorageKey(vec![1])]), None)
|
||||
);
|
||||
let mut recv2 = futures::executor::block_on_stream(
|
||||
let mut recv2 = futures03::executor::block_on_stream(
|
||||
notifications.listen(Some(&[StorageKey(vec![2])]), None)
|
||||
);
|
||||
let mut recv3 = futures::executor::block_on_stream(
|
||||
let mut recv3 = futures03::executor::block_on_stream(
|
||||
notifications.listen(Some(&[]), Some(&child_filter))
|
||||
);
|
||||
|
||||
@@ -429,16 +429,16 @@ mod tests {
|
||||
let mut notifications = StorageNotifications::<Block>::default();
|
||||
{
|
||||
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
|
||||
let _recv1 = futures::executor::block_on_stream(
|
||||
let _recv1 = futures03::executor::block_on_stream(
|
||||
notifications.listen(Some(&[StorageKey(vec![1])]), None)
|
||||
);
|
||||
let _recv2 = futures::executor::block_on_stream(
|
||||
let _recv2 = futures03::executor::block_on_stream(
|
||||
notifications.listen(Some(&[StorageKey(vec![2])]), None)
|
||||
);
|
||||
let _recv3 = futures::executor::block_on_stream(
|
||||
let _recv3 = futures03::executor::block_on_stream(
|
||||
notifications.listen(None, None)
|
||||
);
|
||||
let _recv4 = futures::executor::block_on_stream(
|
||||
let _recv4 = futures03::executor::block_on_stream(
|
||||
notifications.listen(None, Some(&child_filter))
|
||||
);
|
||||
assert_eq!(notifications.listeners.len(), 2);
|
||||
@@ -465,7 +465,7 @@ mod tests {
|
||||
// given
|
||||
let mut recv = {
|
||||
let mut notifications = StorageNotifications::<Block>::default();
|
||||
let recv = futures::executor::block_on_stream(notifications.listen(None, None));
|
||||
let recv = futures03::executor::block_on_stream(notifications.listen(None, None));
|
||||
|
||||
// when
|
||||
let changeset = vec![];
|
||||
|
||||
Reference in New Issue
Block a user