diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index c3fa82eab7..5fc01ecf5e 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1055,7 +1055,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "9.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#4beea5c0b4075fed219373c59d95b0e5137a6792" +source = "git+https://github.com/paritytech/jsonrpc.git#0d78b8f145c18f08c1103f6b0b51991a89fb0a6f" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1067,7 +1067,7 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "9.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#4beea5c0b4075fed219373c59d95b0e5137a6792" +source = "git+https://github.com/paritytech/jsonrpc.git#0d78b8f145c18f08c1103f6b0b51991a89fb0a6f" dependencies = [ "hyper 0.12.17 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 9.0.0 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1080,7 +1080,7 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "9.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#4beea5c0b4075fed219373c59d95b0e5137a6792" +source = "git+https://github.com/paritytech/jsonrpc.git#0d78b8f145c18f08c1103f6b0b51991a89fb0a6f" dependencies = [ "jsonrpc-core 9.0.0 (git+https://github.com/paritytech/jsonrpc.git)", "jsonrpc-pubsub 9.0.0 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1090,7 +1090,7 @@ dependencies = [ [[package]] name = "jsonrpc-pubsub" version = "9.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#4beea5c0b4075fed219373c59d95b0e5137a6792" +source = "git+https://github.com/paritytech/jsonrpc.git#0d78b8f145c18f08c1103f6b0b51991a89fb0a6f" dependencies = [ "jsonrpc-core 9.0.0 (git+https://github.com/paritytech/jsonrpc.git)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1100,7 +1100,7 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "9.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#4beea5c0b4075fed219373c59d95b0e5137a6792" +source = "git+https://github.com/paritytech/jsonrpc.git#0d78b8f145c18f08c1103f6b0b51991a89fb0a6f" dependencies = [ "bytes 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1116,7 +1116,7 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "9.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#4beea5c0b4075fed219373c59d95b0e5137a6792" +source = "git+https://github.com/paritytech/jsonrpc.git#0d78b8f145c18f08c1103f6b0b51991a89fb0a6f" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 9.0.0 (git+https://github.com/paritytech/jsonrpc.git)", diff --git a/substrate/core/rpc/src/author/mod.rs b/substrate/core/rpc/src/author/mod.rs index 9d3482f1ee..7e26035525 100644 --- a/substrate/core/rpc/src/author/mod.rs +++ b/substrate/core/rpc/src/author/mod.rs @@ -64,7 +64,7 @@ build_rpc_trait! { /// Unsubscribe from extrinsic watching. #[rpc(name = "author_unwatchExtrinsic")] - fn unwatch_extrinsic(&self, SubscriptionId) -> Result; + fn unwatch_extrinsic(&self, Self::Metadata, SubscriptionId) -> Result; } } @@ -149,7 +149,7 @@ impl AuthorApi, BlockHash

> for Author whe }) } - fn unwatch_extrinsic(&self, id: SubscriptionId) -> Result { + fn unwatch_extrinsic(&self, _metadata: Self::Metadata, id: SubscriptionId) -> Result { Ok(self.subscriptions.cancel(id)) } } diff --git a/substrate/core/rpc/src/chain/mod.rs b/substrate/core/rpc/src/chain/mod.rs index e836857bc9..c4e0899fa1 100644 --- a/substrate/core/rpc/src/chain/mod.rs +++ b/substrate/core/rpc/src/chain/mod.rs @@ -21,13 +21,11 @@ use std::sync::Arc; use client::{self, Client, BlockchainEvents}; use jsonrpc_macros::{pubsub, Trailing}; use jsonrpc_pubsub::SubscriptionId; +use primitives::{H256, Blake2Hasher}; use rpc::Result as RpcResult; use rpc::futures::{stream, Future, Sink, Stream}; -use primitives::H256; use runtime_primitives::generic::{BlockId, SignedBlock}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; -use runtime_version::RuntimeVersion; -use primitives::{Blake2Hasher, storage}; use subscriptions::Subscriptions; @@ -60,10 +58,6 @@ build_rpc_trait! { #[rpc(name = "chain_getFinalisedHead")] fn finalised_head(&self) -> Result; - /// Get the runtime version. - #[rpc(name = "chain_getRuntimeVersion")] - fn runtime_version(&self, Trailing) -> Result; - #[pubsub(name = "chain_newHead")] { /// New head subscription #[rpc(name = "chain_subscribeNewHead", alias = ["subscribe_newHead", ])] @@ -71,7 +65,7 @@ build_rpc_trait! { /// Unsubscribe from new head subscription. #[rpc(name = "chain_unsubscribeNewHead", alias = ["unsubscribe_newHead", ])] - fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult; + fn unsubscribe_new_head(&self, Self::Metadata, SubscriptionId) -> RpcResult; } #[pubsub(name = "chain_finalisedHead")] { @@ -81,17 +75,7 @@ build_rpc_trait! { /// Unsubscribe from new head subscription. #[rpc(name = "chain_unsubscribeFinalisedHeads")] - fn unsubscribe_finalised_heads(&self, SubscriptionId) -> RpcResult; - } - - #[pubsub(name = "chain_runtimeVersion")] { - /// New runtime version subscription - #[rpc(name = "chain_subscribeRuntimeVersion")] - fn subscribe_runtime_version(&self, Self::Metadata, pubsub::Subscriber); - - /// Unsubscribe from runtime version subscription - #[rpc(name = "chain_unsubscribeRuntimeVersion")] - fn unsubscribe_runtime_version(&self, SubscriptionId) -> RpcResult; + fn unsubscribe_finalised_heads(&self, Self::Metadata, SubscriptionId) -> RpcResult; } } } @@ -195,11 +179,6 @@ impl ChainApi, Sig Ok(self.client.info()?.chain.finalized_hash) } - fn runtime_version(&self, at: Trailing) -> Result { - let at = self.unwrap_or_best(at)?; - Ok(self.client.runtime_version_at(&BlockId::Hash(at))?) - } - fn subscribe_new_head(&self, _metadata: Self::Metadata, subscriber: pubsub::Subscriber) { self.subscribe_headers( subscriber, @@ -210,7 +189,7 @@ impl ChainApi, Sig ) } - fn unsubscribe_new_head(&self, id: SubscriptionId) -> RpcResult { + fn unsubscribe_new_head(&self, _metadata: Self::Metadata, id: SubscriptionId) -> RpcResult { Ok(self.subscriptions.cancel(id)) } @@ -223,55 +202,7 @@ impl ChainApi, Sig ) } - fn unsubscribe_finalised_heads(&self, id: SubscriptionId) -> RpcResult { - Ok(self.subscriptions.cancel(id)) - } - - fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber) { - let stream = match self.client.storage_changes_notification_stream(Some(&[storage::StorageKey(storage::well_known_keys::CODE.to_vec())])) { - Ok(stream) => stream, - Err(err) => { - let _ = subscriber.reject(error::Error::from(err).into()); - return; - } - }; - - self.subscriptions.add(subscriber, |sink| { - let version = self.runtime_version(None.into()) - .map_err(Into::into); - - let client = self.client.clone(); - let mut previous_version = version.clone(); - - let stream = stream - .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) - .filter_map(move |_| { - let version = client.info().and_then(|info| { - client.runtime_version_at(&BlockId::hash(info.chain.best_hash)) - }) - .map_err(error::Error::from) - .map_err(Into::into); - if previous_version != version { - previous_version = version.clone(); - Some(version) - } else { - None - } - }); - - sink - .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all( - stream::iter_result(vec![Ok(version)]) - .chain(stream) - ) - // we ignore the resulting Stream (if the first stream is over we are unsubscribed) - .map(|_| ()) - }); - } - - - fn unsubscribe_runtime_version(&self, id: SubscriptionId) -> RpcResult { + fn unsubscribe_finalised_heads(&self, _metadata: Self::Metadata, id: SubscriptionId) -> RpcResult { Ok(self.subscriptions.cancel(id)) } } diff --git a/substrate/core/rpc/src/chain/tests.rs b/substrate/core/rpc/src/chain/tests.rs index 44e59b08f9..b3f9ee93e2 100644 --- a/substrate/core/rpc/src/chain/tests.rs +++ b/substrate/core/rpc/src/chain/tests.rs @@ -17,7 +17,7 @@ use super::*; use jsonrpc_macros::pubsub; use test_client::{self, TestClient}; -use test_client::runtime::{Block, Header, VERSION}; +use test_client::runtime::{Block, Header}; use consensus::BlockOrigin; #[test] @@ -246,19 +246,3 @@ fn should_notify_about_finalised_block() { // no more notifications on this channel assert_eq!(core.block_on(next.into_future()).unwrap().0, None); } - -#[test] -fn should_return_runtime_version() { - let core = ::tokio::runtime::Runtime::new().unwrap(); - let remote = core.executor(); - - let client = Chain { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(remote), - }; - - assert_matches!( - client.runtime_version(None.into()), - Ok(ref ver) if ver == &VERSION - ); -} diff --git a/substrate/core/rpc/src/state/mod.rs b/substrate/core/rpc/src/state/mod.rs index f1ad6a67c2..d044962f0a 100644 --- a/substrate/core/rpc/src/state/mod.rs +++ b/substrate/core/rpc/src/state/mod.rs @@ -25,14 +25,14 @@ use client::{self, Client, CallExecutor, BlockchainEvents, runtime_api::Metadata use jsonrpc_macros::Trailing; use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; -use primitives::H256; +use primitives::{H256, Blake2Hasher, Bytes}; use primitives::hexdisplay::HexDisplay; -use primitives::storage::{StorageKey, StorageData, StorageChangeSet}; -use primitives::{Blake2Hasher, Bytes}; +use primitives::storage::{self, StorageKey, StorageData, StorageChangeSet}; use rpc::Result as RpcResult; use rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Block as BlockT, Header, ProvideRuntimeApi}; +use runtime_version::RuntimeVersion; use subscriptions::Subscriptions; @@ -67,6 +67,10 @@ build_rpc_trait! { #[rpc(name = "state_getMetadata")] fn metadata(&self, Trailing) -> Result; + /// Get the runtime version. + #[rpc(name = "state_getRuntimeVersion", alias = ["chain_getRuntimeVersion", ])] + fn runtime_version(&self, Trailing) -> Result; + /// Query historical storage entries (by key) starting from a block given as the second parameter. /// /// NOTE This first returned result contains the initial state of storage for all keys. @@ -74,6 +78,16 @@ build_rpc_trait! { #[rpc(name = "state_queryStorage")] fn query_storage(&self, Vec, Hash, Trailing) -> Result>>; + #[pubsub(name = "state_runtimeVersion")] { + /// New runtime version subscription + #[rpc(name = "state_subscribeRuntimeVersion", alias = ["chain_subscribeRuntimeVersion", ])] + fn subscribe_runtime_version(&self, Self::Metadata, pubsub::Subscriber); + + /// Unsubscribe from runtime version subscription + #[rpc(name = "state_unsubscribeRuntimeVersion", alias = ["chain_unsubscribeRuntimeVersion", ])] + fn unsubscribe_runtime_version(&self, Self::Metadata, SubscriptionId) -> RpcResult; + } + #[pubsub(name = "state_storage")] { /// New storage subscription #[rpc(name = "state_subscribeStorage")] @@ -81,7 +95,7 @@ build_rpc_trait! { /// Unsubscribe from storage subscription #[rpc(name = "state_unsubscribeStorage")] - fn unsubscribe_storage(&self, SubscriptionId) -> RpcResult; + fn unsubscribe_storage(&self, Self::Metadata, SubscriptionId) -> RpcResult; } } } @@ -268,7 +282,59 @@ impl StateApi for State where }) } - fn unsubscribe_storage(&self, id: SubscriptionId) -> RpcResult { + fn unsubscribe_storage(&self, _meta: Self::Metadata, id: SubscriptionId) -> RpcResult { + Ok(self.subscriptions.cancel(id)) + } + + fn runtime_version(&self, at: Trailing) -> Result { + let at = self.unwrap_or_best(at)?; + Ok(self.client.runtime_version_at(&BlockId::Hash(at))?) + } + + fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber) { + let stream = match self.client.storage_changes_notification_stream(Some(&[StorageKey(storage::well_known_keys::CODE.to_vec())])) { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(error::Error::from(err).into()); + return; + } + }; + + self.subscriptions.add(subscriber, |sink| { + let version = self.runtime_version(None.into()) + .map_err(Into::into); + + let client = self.client.clone(); + let mut previous_version = version.clone(); + + let stream = stream + .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) + .filter_map(move |_| { + let version = client.info().and_then(|info| { + client.runtime_version_at(&BlockId::hash(info.chain.best_hash)) + }) + .map_err(error::Error::from) + .map_err(Into::into); + if previous_version != version { + previous_version = version.clone(); + Some(version) + } else { + None + } + }); + + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all( + stream::iter_result(vec![Ok(version)]) + .chain(stream) + ) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }); + } + + fn unsubscribe_runtime_version(&self, _meta: Self::Metadata, id: SubscriptionId) -> RpcResult { Ok(self.subscriptions.cancel(id)) } } diff --git a/substrate/core/rpc/src/state/tests.rs b/substrate/core/rpc/src/state/tests.rs index 1f5b968ff3..3fe306d0bc 100644 --- a/substrate/core/rpc/src/state/tests.rs +++ b/substrate/core/rpc/src/state/tests.rs @@ -178,3 +178,39 @@ fn should_query_storage() { }); assert_eq!(result.unwrap(), expected); } + + +#[test] +fn should_return_runtime_version() { + let core = ::tokio::runtime::Runtime::new().unwrap(); + + let client = Arc::new(test_client::new()); + let api = State::new(client.clone(), Subscriptions::new(core.executor())); + + assert_matches!( + api.runtime_version(None.into()), + Ok(ref ver) if ver == &runtime::VERSION + ); +} + +#[test] +fn should_notify_on_runtime_version_initially() { + let mut core = ::tokio::runtime::Runtime::new().unwrap(); + let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); + + { + let client = Arc::new(test_client::new()); + let api = State::new(client.clone(), Subscriptions::new(core.executor())); + + api.subscribe_runtime_version(Default::default(), subscriber); + + // assert id assigned + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); + } + + // assert initial version sent. + let (notification, next) = core.block_on(transport.into_future()).unwrap(); + assert!(notification.is_some()); + // no more notifications on this channel + assert_eq!(core.block_on(next.into_future()).unwrap().0, None); +}