Revert "Use Subscription Manager from jsonrpc-pubsub (#6208)" (#6252)

This reverts commit ea1eb4e57f.
This commit is contained in:
André Silva
2020-06-04 15:39:57 +01:00
committed by GitHub
parent 3fef099893
commit 8f5a52fe1a
31 changed files with 285 additions and 196 deletions
+4 -4
View File
@@ -25,10 +25,10 @@ mod state_light;
mod tests;
use std::sync::Arc;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use rpc::{Result as RpcResult, futures::{Future, future::result}};
use sc_rpc_api::state::ReadProof;
use sc_rpc_api::{Subscriptions, state::ReadProof};
use sc_client_api::light::{RemoteBlockchain, Fetcher};
use sp_core::{Bytes, storage::{StorageKey, PrefixedStorageKey, StorageData, StorageChangeSet}};
use sp_version::RuntimeVersion;
@@ -170,7 +170,7 @@ pub trait StateBackend<Block: BlockT, Client>: Send + Sync + 'static
/// Create new state API that works on full node.
pub fn new_full<BE, Block: BlockT, Client>(
client: Arc<Client>,
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
) -> (State<Block, Client>, ChildState<Block, Client>)
where
Block: BlockT + 'static,
@@ -191,7 +191,7 @@ pub fn new_full<BE, Block: BlockT, Client>(
/// Create new state API that works on light node.
pub fn new_light<BE, Block: BlockT, Client, F: Fetcher<Block>>(
client: Arc<Client>,
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
remote_blockchain: Arc<dyn RemoteBlockchain<Block>>,
fetcher: Arc<F>,
) -> (State<Block, Client>, ChildState<Block, Client>)
+4 -4
View File
@@ -21,10 +21,10 @@ use std::sync::Arc;
use std::ops::Range;
use futures::{future, StreamExt as _, TryStreamExt as _};
use log::warn;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use rpc::{Result as RpcResult, futures::{stream, Future, Sink, Stream, future::result}};
use sc_rpc_api::state::ReadProof;
use sc_rpc_api::{Subscriptions, state::ReadProof};
use sc_client_api::backend::Backend;
use sp_blockchain::{Result as ClientResult, Error as ClientError, HeaderMetadata, CachedHeaderMetadata, HeaderBackend};
use sc_client_api::BlockchainEvents;
@@ -60,7 +60,7 @@ struct QueryStorageRange<Block: BlockT> {
/// State API backend for full nodes.
pub struct FullState<BE, Block: BlockT, Client> {
client: Arc<Client>,
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
_phantom: PhantomData<(BE, Block)>
}
@@ -72,7 +72,7 @@ impl<BE, Block: BlockT, Client> FullState<BE, Block, Client>
Block: BlockT + 'static,
{
/// Create new state API backend for full nodes.
pub fn new(client: Arc<Client>, subscriptions: SubscriptionManager) -> Self {
pub fn new(client: Arc<Client>, subscriptions: Subscriptions) -> Self {
Self { client, subscriptions, _phantom: PhantomData }
}
@@ -28,7 +28,7 @@ use futures::{
StreamExt as _, TryStreamExt as _,
};
use hash_db::Hasher;
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
use parking_lot::Mutex;
use rpc::{
@@ -38,7 +38,7 @@ use rpc::{
futures::stream::Stream,
};
use sc_rpc_api::state::ReadProof;
use sc_rpc_api::{Subscriptions, state::ReadProof};
use sp_blockchain::{Error as ClientError, HeaderBackend};
use sc_client_api::{
BlockchainEvents,
@@ -63,7 +63,7 @@ type StorageMap = HashMap<StorageKey, Option<StorageData>>;
#[derive(Clone)]
pub struct LightState<Block: BlockT, F: Fetcher<Block>, Client> {
client: Arc<Client>,
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
version_subscriptions: SimpleSubscriptions<Block::Hash, RuntimeVersion>,
storage_subscriptions: Arc<Mutex<StorageSubscriptions<Block>>>,
remote_blockchain: Arc<dyn RemoteBlockchain<Block>>,
@@ -143,7 +143,7 @@ impl<Block: BlockT, F: Fetcher<Block> + 'static, Client> LightState<Block, F, Cl
/// Create new state API backend for light nodes.
pub fn new(
client: Arc<Client>,
subscriptions: SubscriptionManager,
subscriptions: Subscriptions,
remote_blockchain: Arc<dyn RemoteBlockchain<Block>>,
fetcher: Arc<F>,
) -> Self {
+11 -21
View File
@@ -55,7 +55,7 @@ fn should_return_storage() {
.add_extra_child_storage(&child_info, KEY.to_vec(), CHILD_VALUE.to_vec())
.build();
let genesis_hash = client.genesis_hash();
let (client, child) = new_full(Arc::new(client), SubscriptionManager::new(Arc::new(TaskExecutor)));
let (client, child) = new_full(Arc::new(client), Subscriptions::new(Arc::new(TaskExecutor)));
let key = StorageKey(KEY.to_vec());
assert_eq!(
@@ -90,7 +90,7 @@ fn should_return_child_storage() {
.add_child_storage(&child_info, "key", vec![42_u8])
.build());
let genesis_hash = client.genesis_hash();
let (_client, child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor)));
let (_client, child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
let child_key = prefixed_storage_key();
let key = StorageKey(b"key".to_vec());
@@ -125,7 +125,7 @@ fn should_return_child_storage() {
fn should_call_contract() {
let client = Arc::new(substrate_test_runtime_client::new());
let genesis_hash = client.genesis_hash();
let (client, _child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor)));
let (client, _child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()).wait(),
@@ -139,15 +139,12 @@ fn should_notify_about_storage_changes() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_storage(Default::default(), subscriber, None.into());
// assert id assigned
assert!(matches!(
executor::block_on(id.compat()),
Ok(Ok(SubscriptionId::String(_)))
));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -173,7 +170,7 @@ fn should_send_initial_storage_changes_and_notifications() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into()));
@@ -182,10 +179,7 @@ fn should_send_initial_storage_changes_and_notifications() {
]).into());
// assert id assigned
assert!(matches!(
executor::block_on(id.compat()),
Ok(Ok(SubscriptionId::String(_)))
));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -211,7 +205,7 @@ fn should_send_initial_storage_changes_and_notifications() {
#[test]
fn should_query_storage() {
fn run_tests(mut client: Arc<TestClient>, has_changes_trie_config: bool) {
let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let mut add_block = |nonce| {
let mut builder = client.new_block(Default::default()).unwrap();
@@ -428,7 +422,7 @@ fn should_split_ranges() {
#[test]
fn should_return_runtime_version() {
let client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\
\"specVersion\":2,\"implVersion\":2,\"apis\":[[\"0xdf6acb689907609b\",3],\
@@ -451,16 +445,12 @@ fn should_notify_on_runtime_version_initially() {
{
let client = Arc::new(substrate_test_runtime_client::new());
let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_runtime_version(Default::default(), subscriber);
// assert id assigned
assert!(matches!(
executor::block_on(id.compat()),
Ok(Ok(SubscriptionId::String(_)))
));
assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
}
// assert initial version sent.