Fix state RPC subscriptions on light node (#3626)

* fetch all keys at once in light RPC subscriptions

* restore lost fil
This commit is contained in:
Svyatoslav Nikolsky
2019-10-07 09:00:15 +03:00
committed by Gavin Wood
parent 168051c060
commit 32e687abe4
6 changed files with 714 additions and 207 deletions
+1
View File
@@ -5224,6 +5224,7 @@ dependencies = [
"jsonrpc-pubsub 13.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-scale-codec 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustc-hex 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)",
"sr-io 2.0.0",
+1
View File
@@ -23,6 +23,7 @@ substrate-executor = { path = "../executor" }
substrate-keystore = { path = "../keystore" }
transaction_pool = { package = "substrate-transaction-pool", path = "../transaction-pool" }
hash-db = { version = "0.15.2", default-features = false }
parking_lot = { version = "0.9.0" }
[dev-dependencies]
assert_matches = "1.3.0"
+5 -2
View File
@@ -74,13 +74,14 @@ impl Subscriptions {
/// Second parameter is a function that converts Subscriber sink into a future.
/// This future will be driven to completion by the underlying event loop
/// or will be cancelled in case #cancel is invoked.
pub fn add<T, E, G, R, F>(&self, subscriber: Subscriber<T, E>, into_future: G) where
pub fn add<T, E, G, R, F>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId where
G: FnOnce(Sink<T, E>) -> R,
R: future::IntoFuture<Future=F, Item=(), Error=()>,
F: future::Future<Item=(), Error=()> + Send + 'static,
{
let id = self.next_id.next_id();
if let Ok(sink) = subscriber.assign_id(id.into()) {
let subscription_id: SubscriptionId = id.into();
if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
let (tx, rx) = oneshot::channel();
let future = into_future(sink)
.into_future()
@@ -92,6 +93,8 @@ impl Subscriptions {
error!("Failed to spawn RPC subscription task");
}
}
subscription_id
}
/// Cancel subscription.
+8 -111
View File
@@ -23,27 +23,24 @@ mod state_light;
mod tests;
use std::sync::Arc;
use futures03::{future, StreamExt as _, TryStreamExt as _};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
use rpc::{
Result as RpcResult,
futures::{stream, Future, Sink, Stream},
futures::Future,
};
use api::Subscriptions;
use client::{
BlockchainEvents, Client, CallExecutor,
Client, CallExecutor,
runtime_api::Metadata,
light::{blockchain::RemoteBlockchain, fetcher::Fetcher},
};
use primitives::{
Blake2Hasher, Bytes, H256,
storage::{well_known_keys, StorageKey, StorageData, StorageChangeSet},
storage::{StorageKey, StorageData, StorageChangeSet},
};
use runtime_version::RuntimeVersion;
use sr_primitives::{
generic::BlockId,
traits::{Block as BlockT, ProvideRuntimeApi},
};
@@ -59,12 +56,6 @@ pub trait StateBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
E: client::CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
RA: Send + Sync + 'static,
{
/// Get client reference.
fn client(&self) -> &Arc<Client<B, E, Block, RA>>;
/// Get subscriptions reference.
fn subscriptions(&self) -> &Subscriptions;
/// Call runtime method at given block.
fn call(
&self,
@@ -161,123 +152,29 @@ pub trait StateBackend<B, E, Block: BlockT, RA>: Send + Sync + 'static
&self,
_meta: crate::metadata::Metadata,
subscriber: Subscriber<RuntimeVersion>,
) {
let stream = match self.client().storage_changes_notification_stream(
Some(&[StorageKey(well_known_keys::CODE.to_vec())]),
None,
) {
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(Error::from(client_err(err)).into());
return;
}
};
self.subscriptions().add(subscriber, |sink| {
let version = self.runtime_version(None.into())
.map_err(Into::into)
.wait();
let client = self.client().clone();
let mut previous_version = version.clone();
let stream = stream
.filter_map(move |_| {
let info = client.info();
let version = client
.runtime_version_at(&BlockId::hash(info.chain.best_hash))
.map_err(client_err)
.map_err(Into::into);
if previous_version != version {
previous_version = version.clone();
future::ready(Some(Ok::<_, ()>(version)))
} else {
future::ready(None)
}
})
.compat();
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(
stream::iter_result(vec![Ok(version)])
.chain(stream)
)
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
}
);
/// Unsubscribe from runtime version subscription
fn unsubscribe_runtime_version(
&self,
_meta: Option<crate::metadata::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions().cancel(id))
}
) -> RpcResult<bool>;
/// New storage subscription
fn subscribe_storage(
&self,
_meta: crate::metadata::Metadata,
subscriber: Subscriber<StorageChangeSet<Block::Hash>>,
keys: Option<Vec<StorageKey>>
) {
let keys = Into::<Option<Vec<_>>>::into(keys);
let stream = match self.client().storage_changes_notification_stream(
keys.as_ref().map(|x| &**x),
None
) {
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(client_err(err).into());
return;
},
};
// initial values
let initial = stream::iter_result(keys
.map(|keys| {
let block = self.client().info().chain.best_hash;
let changes = keys
.into_iter()
.map(|key| self.storage(Some(block.clone()).into(), key.clone())
.map(|val| (key.clone(), val))
.wait()
.unwrap_or_else(|_| (key, None))
)
.collect();
vec![Ok(Ok(StorageChangeSet { block, changes }))]
}).unwrap_or_default());
self.subscriptions().add(subscriber, |sink| {
let stream = stream
.map(|(block, changes)| Ok::<_, ()>(Ok(StorageChangeSet {
block,
changes: changes.iter()
.filter_map(|(o_sk, k, v)| if o_sk.is_none() {
Some((k.clone(),v.cloned()))
} else { None }).collect(),
})))
.compat();
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(initial.chain(stream))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
})
}
keys: Option<Vec<StorageKey>>,
);
/// Unsubscribe from storage subscription
fn unsubscribe_storage(
&self,
_meta: Option<crate::metadata::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions().cancel(id))
}
) -> RpcResult<bool>;
}
/// Create new state API that works on full node.
+130 -12
View File
@@ -19,16 +19,22 @@
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::ops::Range;
use rpc::futures::future::result;
use futures03::{future, StreamExt as _, TryStreamExt as _};
use log::warn;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use rpc::{
Result as RpcResult,
futures::{stream, Future, Sink, Stream, future::result},
};
use api::Subscriptions;
use client::{
Client, CallExecutor, runtime_api::Metadata,
Client, CallExecutor, BlockchainEvents, runtime_api::Metadata,
backend::Backend, error::Result as ClientResult,
};
use primitives::{
H256, Blake2Hasher, Bytes, offchain::NeverOffchainExt,
storage::{StorageKey, StorageData, StorageChangeSet},
storage::{well_known_keys, StorageKey, StorageData, StorageChangeSet},
};
use runtime_version::RuntimeVersion;
use state_machine::ExecutionStrategy;
@@ -53,6 +59,7 @@ struct QueryStorageRange<Block: BlockT> {
pub filtered_range: Option<Range<usize>>,
}
/// State API backend for full nodes.
pub struct FullState<B, E, Block: BlockT, RA> {
client: Arc<Client<B, E, Block, RA>>,
subscriptions: Subscriptions,
@@ -64,7 +71,7 @@ impl<B, E, Block: BlockT, RA> FullState<B, E, Block, RA>
B: Backend<Block, Blake2Hasher> + Send + Sync + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static + Clone,
{
///
/// Create new state API backend for full nodes.
pub fn new(client: Arc<Client<B, E, Block, RA>>, subscriptions: Subscriptions) -> Self {
Self { client, subscriptions }
}
@@ -225,14 +232,6 @@ impl<B, E, Block, RA> StateBackend<B, E, Block, RA> for FullState<B, E, Block, R
Client<B, E, Block, RA>: ProvideRuntimeApi,
<Client<B, E, Block, RA> as ProvideRuntimeApi>::Api: Metadata<Block>,
{
fn client(&self) -> &Arc<Client<B, E, Block, RA>> {
&self.client
}
fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}
fn call(
&self,
block: Option<Block::Hash>,
@@ -352,6 +351,125 @@ impl<B, E, Block, RA> StateBackend<B, E, Block, RA> for FullState<B, E, Block, R
};
Box::new(result(call_fn()))
}
fn subscribe_runtime_version(
&self,
_meta: crate::metadata::Metadata,
subscriber: Subscriber<RuntimeVersion>,
) {
let stream = match self.client.storage_changes_notification_stream(
Some(&[StorageKey(well_known_keys::CODE.to_vec())]),
None,
) {
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(Error::from(client_err(err)).into());
return;
}
};
self.subscriptions.add(subscriber, |sink| {
let version = self.runtime_version(None.into())
.map_err(Into::into)
.wait();
let client = self.client.clone();
let mut previous_version = version.clone();
let stream = stream
.filter_map(move |_| {
let info = client.info();
let version = client
.runtime_version_at(&BlockId::hash(info.chain.best_hash))
.map_err(client_err)
.map_err(Into::into);
if previous_version != version {
previous_version = version.clone();
future::ready(Some(Ok::<_, ()>(version)))
} else {
future::ready(None)
}
})
.compat();
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(
stream::iter_result(vec![Ok(version)])
.chain(stream)
)
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
}
fn unsubscribe_runtime_version(
&self,
_meta: Option<crate::metadata::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions.cancel(id))
}
fn subscribe_storage(
&self,
_meta: crate::metadata::Metadata,
subscriber: Subscriber<StorageChangeSet<Block::Hash>>,
keys: Option<Vec<StorageKey>>,
) {
let keys = Into::<Option<Vec<_>>>::into(keys);
let stream = match self.client.storage_changes_notification_stream(
keys.as_ref().map(|x| &**x),
None
) {
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(client_err(err).into());
return;
},
};
// initial values
let initial = stream::iter_result(keys
.map(|keys| {
let block = self.client.info().chain.best_hash;
let changes = keys
.into_iter()
.map(|key| self.storage(Some(block.clone()).into(), key.clone())
.map(|val| (key.clone(), val))
.wait()
.unwrap_or_else(|_| (key, None))
)
.collect();
vec![Ok(Ok(StorageChangeSet { block, changes }))]
}).unwrap_or_default());
self.subscriptions.add(subscriber, |sink| {
let stream = stream
.map(|(block, changes)| Ok::<_, ()>(Ok(StorageChangeSet {
block,
changes: changes.iter()
.filter_map(|(o_sk, k, v)| if o_sk.is_none() {
Some((k.clone(),v.cloned()))
} else { None }).collect(),
})))
.compat();
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(initial.chain(stream))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
}
fn unsubscribe_storage(
&self,
_meta: Option<crate::metadata::Metadata>,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(self.subscriptions.cancel(id))
}
}
/// Splits passed range into two subranges where:
+569 -82
View File
@@ -16,19 +16,31 @@
//! State API backend for light nodes.
use std::sync::Arc;
use std::{
sync::Arc,
collections::{HashSet, HashMap, hash_map::Entry},
};
use codec::Decode;
use futures03::{future::{ready, Either}, FutureExt, TryFutureExt};
use futures03::{
future::{ready, Either},
channel::oneshot::{channel, Sender},
FutureExt, TryFutureExt,
StreamExt as _, TryStreamExt as _,
};
use hash_db::Hasher;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
use parking_lot::Mutex;
use rpc::{
Result as RpcResult,
futures::Sink,
futures::future::{result, Future},
futures::stream::Stream,
};
use api::Subscriptions;
use client::{
Client, CallExecutor, backend::Backend,
BlockchainEvents, Client, CallExecutor, backend::Backend,
error::Error as ClientError,
light::{
blockchain::{future_header, RemoteBlockchain},
@@ -42,18 +54,89 @@ use primitives::{
use runtime_version::RuntimeVersion;
use sr_primitives::{
generic::BlockId,
traits::{Block as BlockT, Header as HeaderT},
traits::Block as BlockT,
};
use super::{StateBackend, error::{FutureResult, Error}, client_err};
/// Storage data map of storage keys => (optional) storage value.
type StorageMap = HashMap<StorageKey, Option<StorageData>>;
/// State API backend for light nodes.
pub struct LightState<Block: BlockT, F: Fetcher<Block>, B, E, RA> {
client: Arc<Client<B, E, Block, RA>>,
subscriptions: Subscriptions,
version_subscriptions: SimpleSubscriptions<Block::Hash, RuntimeVersion>,
storage_subscriptions: Arc<Mutex<StorageSubscriptions<Block>>>,
remote_blockchain: Arc<dyn RemoteBlockchain<Block>>,
fetcher: Arc<F>,
}
/// Shared requests container.
trait SharedRequests<Hash, V>: Clone + Send + Sync {
/// Tries to listen for already issued request, or issues request.
///
/// Returns true if requests has been issued.
fn listen_request(
&self,
block: Hash,
sender: Sender<Result<V, ()>>,
) -> bool;
/// Returns (and forgets) all listeners for given request.
fn on_response_received(&self, block: Hash) -> Vec<Sender<Result<V, ()>>>;
}
/// Storage subscriptions data.
struct StorageSubscriptions<Block: BlockT> {
/// Active storage requests.
active_requests: HashMap<Block::Hash, Vec<Sender<Result<StorageMap, ()>>>>,
/// Map of subscription => keys that this subscription watch for.
keys_by_subscription: HashMap<SubscriptionId, HashSet<StorageKey>>,
/// Map of key => set of subscriptions that watch this key.
subscriptions_by_key: HashMap<StorageKey, HashSet<SubscriptionId>>,
}
impl<Block: BlockT> SharedRequests<Block::Hash, StorageMap> for Arc<Mutex<StorageSubscriptions<Block>>> {
fn listen_request(
&self,
block: Block::Hash,
sender: Sender<Result<StorageMap, ()>>,
) -> bool {
let mut subscriptions = self.lock();
let active_requests_at = subscriptions.active_requests.entry(block).or_default();
active_requests_at.push(sender);
active_requests_at.len() == 1
}
fn on_response_received(&self, block: Block::Hash) -> Vec<Sender<Result<StorageMap, ()>>> {
self.lock().active_requests.remove(&block).unwrap_or_default()
}
}
/// Simple, maybe shared, subscription data that shares per block requests.
type SimpleSubscriptions<Hash, V> = Arc<Mutex<HashMap<Hash, Vec<Sender<Result<V, ()>>>>>>;
impl<Hash, V> SharedRequests<Hash, V> for SimpleSubscriptions<Hash, V> where
Hash: Send + Eq + std::hash::Hash,
V: Send,
{
fn listen_request(
&self,
block: Hash,
sender: Sender<Result<V, ()>>,
) -> bool {
let mut subscriptions = self.lock();
let active_requests_at = subscriptions.entry(block).or_default();
active_requests_at.push(sender);
active_requests_at.len() == 1
}
fn on_response_received(&self, block: Hash) -> Vec<Sender<Result<V, ()>>> {
self.lock().remove(&block).unwrap_or_default()
}
}
impl<Block: BlockT, F: Fetcher<Block> + 'static, B, E, RA> LightState<Block, F, B, E, RA>
where
Block: BlockT<Hash=H256>,
@@ -61,39 +144,31 @@ impl<Block: BlockT, F: Fetcher<Block> + 'static, B, E, RA> LightState<Block, F,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static + Clone,
RA: Send + Sync + 'static,
{
///
/// Create new state API backend for light nodes.
pub fn new(
client: Arc<Client<B, E, Block, RA>>,
subscriptions: Subscriptions,
remote_blockchain: Arc<dyn RemoteBlockchain<Block>>,
fetcher: Arc<F>,
) -> Self {
Self { client, subscriptions, remote_blockchain, fetcher, }
Self {
client,
subscriptions,
version_subscriptions: Arc::new(Mutex::new(HashMap::new())),
storage_subscriptions: Arc::new(Mutex::new(StorageSubscriptions {
active_requests: HashMap::new(),
keys_by_subscription: HashMap::new(),
subscriptions_by_key: HashMap::new(),
})),
remote_blockchain,
fetcher,
}
}
/// Returns given block hash or best block hash if None is passed.
fn block_or_best(&self, hash: Option<Block::Hash>) -> Block::Hash {
hash.unwrap_or_else(|| self.client.info().chain.best_hash)
}
/// Resolve header by hash.
fn resolve_header(
&self,
block: Option<Block::Hash>,
) -> impl std::future::Future<Output = Result<Block::Header, Error>> {
let block = self.block_or_best(block);
let maybe_header = future_header(
&*self.remote_blockchain,
&*self.fetcher,
BlockId::Hash(block),
);
maybe_header.then(move |result|
ready(result.and_then(|maybe_header|
maybe_header.ok_or(ClientError::UnknownBlock(format!("{}", block)))
).map_err(client_err)),
)
}
}
impl<Block, F, B, E, RA> StateBackend<B, E, Block, RA> for LightState<Block, F, B, E, RA>
@@ -104,34 +179,19 @@ impl<Block, F, B, E, RA> StateBackend<B, E, Block, RA> for LightState<Block, F,
RA: Send + Sync + 'static,
F: Fetcher<Block> + 'static
{
fn client(&self) -> &Arc<Client<B, E, Block, RA>> {
&self.client
}
fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}
fn call(
&self,
block: Option<Block::Hash>,
method: String,
call_data: Bytes,
) -> FutureResult<Bytes> {
let fetcher = self.fetcher.clone();
let call_result = self.resolve_header(block)
.then(move |result| match result {
Ok(header) => Either::Left(fetcher.remote_call(RemoteCallRequest {
block: header.hash(),
header,
method,
call_data: call_data.0,
retry_count: Default::default(),
}).then(|result| ready(result.map(Bytes).map_err(client_err)))),
Err(error) => Either::Right(ready(Err(error))),
});
Box::new(call_result.boxed().compat())
Box::new(call(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
method,
call_data,
).boxed().compat())
}
fn storage_keys(
@@ -147,26 +207,15 @@ impl<Block, F, B, E, RA> StateBackend<B, E, Block, RA> for LightState<Block, F,
block: Option<Block::Hash>,
key: StorageKey,
) -> FutureResult<Option<StorageData>> {
let fetcher = self.fetcher.clone();
let storage = self.resolve_header(block)
.then(move |result| match result {
Ok(header) => Either::Left(fetcher.remote_read(RemoteReadRequest {
block: header.hash(),
header,
keys: vec![key.0.clone()],
retry_count: Default::default(),
}).then(move |result| ready(result
.map(|mut data| data
.remove(&key.0)
.expect("successful result has entry for all keys; qed")
.map(StorageData)
)
.map_err(client_err)
))),
Err(error) => Either::Right(ready(Err(error))),
});
Box::new(storage.boxed().compat())
Box::new(storage(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
vec![key.0.clone()],
).boxed().compat().map(move |mut values| values
.remove(&key)
.expect("successful request has entries for all requested keys; qed")
))
}
fn storage_hash(
@@ -197,11 +246,12 @@ impl<Block, F, B, E, RA> StateBackend<B, E, Block, RA> for LightState<Block, F,
child_storage_key: StorageKey,
key: StorageKey,
) -> FutureResult<Option<StorageData>> {
let block = self.block_or_best(block);
let fetcher = self.fetcher.clone();
let child_storage = self.resolve_header(block)
let child_storage = resolve_header(&*self.remote_blockchain, &*self.fetcher, block)
.then(move |result| match result {
Ok(header) => Either::Left(fetcher.remote_read_child(RemoteReadChildRequest {
block: header.hash(),
block,
header,
storage_key: child_storage_key.0,
keys: vec![key.0.clone()],
@@ -247,12 +297,11 @@ impl<Block, F, B, E, RA> StateBackend<B, E, Block, RA> for LightState<Block, F,
}
fn runtime_version(&self, block: Option<Block::Hash>) -> FutureResult<RuntimeVersion> {
let version = self.call(block, "Core_version".into(), Bytes(Vec::new()))
.and_then(|version| Decode::decode(&mut &version.0[..])
.map_err(|_| client_err(ClientError::VersionInvalid))
);
Box::new(version)
Box::new(runtime_version(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
).boxed().compat())
}
fn query_storage(
@@ -267,31 +316,469 @@ impl<Block, F, B, E, RA> StateBackend<B, E, Block, RA> for LightState<Block, F,
fn subscribe_storage(
&self,
_meta: crate::metadata::Metadata,
_subscriber: Subscriber<StorageChangeSet<Block::Hash>>,
_keys: Option<Vec<StorageKey>>
subscriber: Subscriber<StorageChangeSet<Block::Hash>>,
keys: Option<Vec<StorageKey>>
) {
let keys = match keys {
Some(keys) => keys,
None => {
warn!("Cannot subscribe to all keys on light client. Subscription rejected.");
return;
}
};
let keys = keys.iter().cloned().collect::<HashSet<_>>();
let keys_to_check = keys.iter().map(|k| k.0.clone()).collect::<HashSet<_>>();
let subscription_id = self.subscriptions.add(subscriber, move |sink| {
let fetcher = self.fetcher.clone();
let remote_blockchain = self.remote_blockchain.clone();
let storage_subscriptions = self.storage_subscriptions.clone();
let initial_block = self.block_or_best(None);
let initial_keys = keys_to_check.iter().cloned().collect::<Vec<_>>();
let changes_stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
storage_subscriptions.clone(),
self.client
.import_notification_stream()
.map(|notification| Ok::<_, ()>(notification.hash))
.compat(),
display_error(storage(
&*remote_blockchain,
fetcher.clone(),
initial_block,
initial_keys,
).map(move |r| r.map(|r| (initial_block, r)))),
move |block| {
// there'll be single request per block for all active subscriptions
// with all subscribed keys
let keys = storage_subscriptions
.lock()
.subscriptions_by_key
.keys()
.map(|k| k.0.clone())
.collect();
storage(
&*remote_blockchain,
fetcher.clone(),
block,
keys,
)
},
move |block, old_value, new_value| {
// let's only select keys which are valid for this subscription
let new_value = new_value
.iter()
.filter(|(k, _)| keys_to_check.contains(&k.0))
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<HashMap<_, _>>();
let value_differs = old_value
.as_ref()
.map(|old_value| **old_value != new_value)
.unwrap_or(true);
match value_differs {
true => Some(StorageChangeSet {
block,
changes: new_value
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
}),
false => None,
}
}
);
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(changes_stream.map(|changes| Ok(changes)))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
// remember keys associated with this subscription
let mut storage_subscriptions = self.storage_subscriptions.lock();
storage_subscriptions.keys_by_subscription.insert(subscription_id.clone(), keys.clone());
for key in keys {
storage_subscriptions
.subscriptions_by_key
.entry(key)
.or_default()
.insert(subscription_id.clone());
}
}
fn unsubscribe_storage(
&self,
_meta: Option<crate::metadata::Metadata>,
_id: SubscriptionId,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(false)
if !self.subscriptions.cancel(id.clone()) {
return Ok(false);
}
// forget subscription keys
let mut storage_subscriptions = self.storage_subscriptions.lock();
let keys = storage_subscriptions.keys_by_subscription.remove(&id);
for key in keys.into_iter().flat_map(|keys| keys.into_iter()) {
match storage_subscriptions.subscriptions_by_key.entry(key) {
Entry::Vacant(_) => unreachable!("every key from keys_by_subscription has\
corresponding entry in subscriptions_by_key; qed"),
Entry::Occupied(mut entry) => {
entry.get_mut().remove(&id);
if entry.get().is_empty() {
entry.remove();
}
}
}
}
Ok(true)
}
fn subscribe_runtime_version(
&self,
_meta: crate::metadata::Metadata,
_subscriber: Subscriber<RuntimeVersion>,
subscriber: Subscriber<RuntimeVersion>,
) {
self.subscriptions.add(subscriber, move |sink| {
let fetcher = self.fetcher.clone();
let remote_blockchain = self.remote_blockchain.clone();
let version_subscriptions = self.version_subscriptions.clone();
let initial_block = self.block_or_best(None);
let versions_stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
version_subscriptions,
self.client
.import_notification_stream()
.map(|notification| Ok::<_, ()>(notification.hash))
.compat(),
display_error(runtime_version(
&*remote_blockchain,
fetcher.clone(),
initial_block,
).map(move |r| r.map(|r| (initial_block, r)))),
move |block| runtime_version(
&*remote_blockchain,
fetcher.clone(),
block,
),
|_, old_version, new_version| {
let version_differs = old_version
.as_ref()
.map(|old_version| *old_version != new_version)
.unwrap_or(true);
match version_differs {
true => Some(new_version.clone()),
false => None,
}
}
);
sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(versions_stream.map(|version| Ok(version)))
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
}
fn unsubscribe_runtime_version(
&self,
_meta: Option<crate::metadata::Metadata>,
_id: SubscriptionId,
id: SubscriptionId,
) -> RpcResult<bool> {
Ok(false)
Ok(self.subscriptions.cancel(id))
}
}
/// Resolve header by hash.
fn resolve_header<Block: BlockT, F: Fetcher<Block>>(
remote_blockchain: &dyn RemoteBlockchain<Block>,
fetcher: &F,
block: Block::Hash,
) -> impl std::future::Future<Output = Result<Block::Header, Error>> {
let maybe_header = future_header(
remote_blockchain,
fetcher,
BlockId::Hash(block),
);
maybe_header.then(move |result|
ready(result.and_then(|maybe_header|
maybe_header.ok_or(ClientError::UnknownBlock(format!("{}", block)))
).map_err(client_err)),
)
}
/// Call runtime method at given block
fn call<Block: BlockT, F: Fetcher<Block>>(
remote_blockchain: &dyn RemoteBlockchain<Block>,
fetcher: Arc<F>,
block: Block::Hash,
method: String,
call_data: Bytes,
) -> impl std::future::Future<Output = Result<Bytes, Error>> {
resolve_header(remote_blockchain, &*fetcher, block)
.then(move |result| match result {
Ok(header) => Either::Left(fetcher.remote_call(RemoteCallRequest {
block,
header,
method,
call_data: call_data.0,
retry_count: Default::default(),
}).then(|result| ready(result.map(Bytes).map_err(client_err)))),
Err(error) => Either::Right(ready(Err(error))),
})
}
/// Get runtime version at given block.
fn runtime_version<Block: BlockT, F: Fetcher<Block>>(
remote_blockchain: &dyn RemoteBlockchain<Block>,
fetcher: Arc<F>,
block: Block::Hash,
) -> impl std::future::Future<Output = Result<RuntimeVersion, Error>> {
call(
remote_blockchain,
fetcher,
block,
"Core_version".into(),
Bytes(Vec::new()),
)
.then(|version| ready(version.and_then(|version|
Decode::decode(&mut &version.0[..]).map_err(|_| client_err(ClientError::VersionInvalid))
)))
}
/// Get storage value at given key at given block.
fn storage<Block: BlockT, F: Fetcher<Block>>(
remote_blockchain: &dyn RemoteBlockchain<Block>,
fetcher: Arc<F>,
block: Block::Hash,
keys: Vec<Vec<u8>>,
) -> impl std::future::Future<Output = Result<HashMap<StorageKey, Option<StorageData>>, Error>> {
resolve_header(remote_blockchain, &*fetcher, block)
.then(move |result| match result {
Ok(header) => Either::Left(fetcher.remote_read(RemoteReadRequest {
block,
header,
keys,
retry_count: Default::default(),
}).then(|result| ready(result
.map(|result| result
.into_iter()
.map(|(key, value)| (StorageKey(key), value.map(StorageData)))
.collect()
).map_err(client_err)
))),
Err(error) => Either::Right(ready(Err(error))),
})
}
/// Returns subscription stream that issues request on every imported block and
/// if value has changed from previous block, emits (stream) item.
fn subscription_stream<
Block,
Requests,
FutureBlocksStream,
V, N,
InitialRequestFuture,
IssueRequest, IssueRequestFuture,
CompareValues,
>(
shared_requests: Requests,
future_blocks_stream: FutureBlocksStream,
initial_request: InitialRequestFuture,
issue_request: IssueRequest,
compare_values: CompareValues,
) -> impl Stream<Item=N, Error=()> where
Block: BlockT<Hash=H256>,
Requests: 'static + SharedRequests<Block::Hash, V>,
FutureBlocksStream: Stream<Item=Block::Hash, Error=()>,
V: Send + 'static + Clone,
InitialRequestFuture: std::future::Future<Output = Result<(Block::Hash, V), ()>> + Send + 'static,
IssueRequest: 'static + Fn(Block::Hash) -> IssueRequestFuture,
IssueRequestFuture: std::future::Future<Output = Result<V, Error>> + Send + 'static,
CompareValues: Fn(Block::Hash, Option<&V>, &V) -> Option<N>,
{
// we need to send initial value first, then we'll only be sending if value has changed
let previous_value = Arc::new(Mutex::new(None));
// prepare 'stream' of initial values
let initial_value_stream = ignore_error(initial_request)
.boxed()
.compat()
.into_stream();
// prepare stream of future values
//
// we do not want to stop stream if single request fails
// (the warning should have been already issued by the request issuer)
let future_values_stream = future_blocks_stream
.and_then(move |block| ignore_error(maybe_share_remote_request::<Block, _, _, _, _>(
shared_requests.clone(),
block,
&issue_request,
).map(move |r| r.map(|v| (block, v)))).boxed().compat());
// now let's return changed values for selected blocks
initial_value_stream
.chain(future_values_stream)
.filter_map(move |block_and_new_value| block_and_new_value.and_then(|(block, new_value)| {
let mut previous_value = previous_value.lock();
compare_values(block, previous_value.as_ref(), &new_value)
.map(|notification_value| {
*previous_value = Some(new_value);
notification_value
})
}))
.map_err(|_| ())
}
/// Request some data from remote node, probably reusing response from already
/// (in-progress) existing request.
fn maybe_share_remote_request<Block: BlockT, Requests, V, IssueRequest, IssueRequestFuture>(
shared_requests: Requests,
block: Block::Hash,
issue_request: &IssueRequest,
) -> impl std::future::Future<Output = Result<V, ()>> where
V: Clone,
Requests: SharedRequests<Block::Hash, V>,
IssueRequest: Fn(Block::Hash) -> IssueRequestFuture,
IssueRequestFuture: std::future::Future<Output = Result<V, Error>>,
{
let (sender, receiver) = channel();
let need_issue_request = shared_requests.listen_request(block, sender);
// if that isn't the first request - just listen for existing request' response
if !need_issue_request {
return Either::Right(receiver.then(|r| ready(r.unwrap_or(Err(())))));
}
// that is the first request - issue remote request + notify all listeners on
// completion
Either::Left(
display_error(issue_request(block))
.then(move |remote_result| {
let listeners = shared_requests.on_response_received(block);
// skip first element, because this future is the first element
for receiver in listeners.into_iter().skip(1) {
if let Err(_) = receiver.send(remote_result.clone()) {
// we don't care if receiver has been dropped already
}
}
ready(remote_result)
})
)
}
/// Convert successful future result into Ok(result) and error into Err(()),
/// displaying warning.
fn display_error<F, T>(future: F) -> impl std::future::Future<Output=Result<T, ()>> where
F: std::future::Future<Output=Result<T, Error>>
{
future.then(|result| ready(match result {
Ok(result) => Ok(result),
Err(err) => {
warn!("Remote request for subscription data has failed with: {:?}", err);
Err(())
},
}))
}
/// Convert successful future result into Ok(Some(result)) and error into Ok(None),
/// displaying warning.
fn ignore_error<F, T>(future: F) -> impl std::future::Future<Output=Result<Option<T>, ()>> where
F: std::future::Future<Output=Result<T, ()>>
{
future.then(|result| ready(match result {
Ok(result) => Ok(Some(result)),
Err(()) => Ok(None),
}))
}
#[cfg(test)]
mod tests {
use rpc::futures::stream::futures_ordered;
use test_client::runtime::Block;
use super::*;
#[test]
fn subscription_stream_works() {
let stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
SimpleSubscriptions::default(),
futures_ordered(vec![result(Ok(H256::from([2; 32]))), result(Ok(H256::from([3; 32])))]),
ready(Ok((H256::from([1; 32]), 100))),
|block| match block[0] {
2 => ready(Ok(100)),
3 => ready(Ok(200)),
_ => unreachable!("should not issue additional requests"),
},
|_, old_value, new_value| match old_value == Some(new_value) {
true => None,
false => Some(new_value.clone()),
}
);
assert_eq!(
stream.collect().wait(),
Ok(vec![100, 200])
);
}
#[test]
fn subscription_stream_ignores_failed_requests() {
let stream = subscription_stream::<Block, _, _, _, _, _, _, _, _>(
SimpleSubscriptions::default(),
futures_ordered(vec![result(Ok(H256::from([2; 32]))), result(Ok(H256::from([3; 32])))]),
ready(Ok((H256::from([1; 32]), 100))),
|block| match block[0] {
2 => ready(Err(client_err(ClientError::NotAvailableOnLightClient))),
3 => ready(Ok(200)),
_ => unreachable!("should not issue additional requests"),
},
|_, old_value, new_value| match old_value == Some(new_value) {
true => None,
false => Some(new_value.clone()),
}
);
assert_eq!(
stream.collect().wait(),
Ok(vec![100, 200])
);
}
#[test]
fn maybe_share_remote_request_shares_request() {
type UnreachableFuture = futures03::future::Ready<Result<u32, Error>>;
let shared_requests = SimpleSubscriptions::default();
// let's 'issue' requests for B1
shared_requests.lock().insert(
H256::from([1; 32]),
vec![channel().0],
);
// make sure that no additional requests are issued when we're asking for B1
let _ = maybe_share_remote_request::<Block, _, _, _, UnreachableFuture>(
shared_requests.clone(),
H256::from([1; 32]),
&|_| unreachable!("no duplicate requests issued"),
);
// make sure that additional requests is issued when we're asking for B2
let request_issued = Arc::new(Mutex::new(false));
let _ = maybe_share_remote_request::<Block, _, _, _, UnreachableFuture>(
shared_requests.clone(),
H256::from([2; 32]),
&|_| {
*request_issued.lock() = true;
ready(Ok(Default::default()))
},
);
assert!(*request_issued.lock());
}
}