// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
//! State API backend for light nodes.
use std::{
sync::Arc,
collections::{HashSet, HashMap, hash_map::Entry},
};
use codec::Decode;
use futures::{
future::{ready, Either},
channel::oneshot::{channel, Sender},
FutureExt, TryFutureExt,
StreamExt as _, TryStreamExt as _,
};
use hash_db::Hasher;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
use parking_lot::Mutex;
use rpc::{
Result as RpcResult,
futures::Sink,
futures::future::{result, Future},
futures::stream::Stream,
};
use sc_rpc_api::Subscriptions;
use sc_client_api::backend::Backend;
use sp_blockchain::Error as ClientError;
use sc_client::{
BlockchainEvents, Client, CallExecutor,
light::{
blockchain::{future_header, RemoteBlockchain},
fetcher::{Fetcher, RemoteCallRequest, RemoteReadRequest, RemoteReadChildRequest},
},
};
use sp_core::{
Bytes, OpaqueMetadata, storage::{StorageKey, StorageData, StorageChangeSet},
};
use sp_version::RuntimeVersion;
use sp_runtime::{generic::BlockId, traits::{Block as BlockT, HasherFor}};
use super::{StateBackend, error::{FutureResult, Error}, client_err};
/// Storage data map of storage keys => (optional) storage value.
type StorageMap = HashMap>;
/// State API backend for light nodes.
pub struct LightState, B, E, RA> {
client: Arc>,
subscriptions: Subscriptions,
version_subscriptions: SimpleSubscriptions,
storage_subscriptions: Arc>>,
remote_blockchain: Arc>,
fetcher: Arc,
}
/// Shared requests container.
trait SharedRequests: Clone + Send + Sync {
/// Tries to listen for already issued request, or issues request.
///
/// Returns true if requests has been issued.
fn listen_request(
&self,
block: Hash,
sender: Sender>,
) -> bool;
/// Returns (and forgets) all listeners for given request.
fn on_response_received(&self, block: Hash) -> Vec>>;
}
/// Storage subscriptions data.
struct StorageSubscriptions {
/// Active storage requests.
active_requests: HashMap>>>,
/// Map of subscription => keys that this subscription watch for.
keys_by_subscription: HashMap>,
/// Map of key => set of subscriptions that watch this key.
subscriptions_by_key: HashMap>,
}
impl SharedRequests for Arc>> {
fn listen_request(
&self,
block: Block::Hash,
sender: Sender>,
) -> bool {
let mut subscriptions = self.lock();
let active_requests_at = subscriptions.active_requests.entry(block).or_default();
active_requests_at.push(sender);
active_requests_at.len() == 1
}
fn on_response_received(&self, block: Block::Hash) -> Vec>> {
self.lock().active_requests.remove(&block).unwrap_or_default()
}
}
/// Simple, maybe shared, subscription data that shares per block requests.
type SimpleSubscriptions = Arc>>>>>;
impl SharedRequests for SimpleSubscriptions where
Hash: Send + Eq + std::hash::Hash,
V: Send,
{
fn listen_request(
&self,
block: Hash,
sender: Sender>,
) -> bool {
let mut subscriptions = self.lock();
let active_requests_at = subscriptions.entry(block).or_default();
active_requests_at.push(sender);
active_requests_at.len() == 1
}
fn on_response_received(&self, block: Hash) -> Vec>> {
self.lock().remove(&block).unwrap_or_default()
}
}
impl + 'static, B, E, RA> LightState
where
Block: BlockT,
B: Backend + Send + Sync + 'static,
E: CallExecutor + Send + Sync + 'static + Clone,
RA: Send + Sync + 'static,
{
/// Create new state API backend for light nodes.
pub fn new(
client: Arc>,
subscriptions: Subscriptions,
remote_blockchain: Arc>,
fetcher: Arc,
) -> Self {
Self {
client,
subscriptions,
version_subscriptions: Arc::new(Mutex::new(HashMap::new())),
storage_subscriptions: Arc::new(Mutex::new(StorageSubscriptions {
active_requests: HashMap::new(),
keys_by_subscription: HashMap::new(),
subscriptions_by_key: HashMap::new(),
})),
remote_blockchain,
fetcher,
}
}
/// Returns given block hash or best block hash if None is passed.
fn block_or_best(&self, hash: Option) -> Block::Hash {
hash.unwrap_or_else(|| self.client.chain_info().best_hash)
}
}
impl StateBackend for LightState
where
Block: BlockT,
B: Backend + Send + Sync + 'static,
E: CallExecutor + Send + Sync + 'static + Clone,
RA: Send + Sync + 'static,
F: Fetcher + 'static
{
fn call(
&self,
block: Option,
method: String,
call_data: Bytes,
) -> FutureResult {
Box::new(call(
&*self.remote_blockchain,
self.fetcher.clone(),
self.block_or_best(block),
method,
call_data,
).boxed().compat())
}
fn storage_keys(
&self,
_block: Option,
_prefix: StorageKey,
) -> FutureResult> {
Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient))))
}
fn storage(
&self,
block: Option,
key: StorageKey,
) -> FutureResult