From 52093c4b7a124293c6d865b493cee578833ccbfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 23 Oct 2018 08:21:05 +0200 Subject: [PATCH] Runtime version pub sub. (#948) --- substrate/core/rpc/src/chain/mod.rs | 61 ++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/substrate/core/rpc/src/chain/mod.rs b/substrate/core/rpc/src/chain/mod.rs index 6c1724a787..f56ec59c1a 100644 --- a/substrate/core/rpc/src/chain/mod.rs +++ b/substrate/core/rpc/src/chain/mod.rs @@ -26,7 +26,7 @@ use rpc::futures::{stream, Future, Sink, Stream}; use runtime_primitives::generic::{BlockId, SignedBlock}; use runtime_primitives::traits::{Block as BlockT, Header, NumberFor}; use runtime_version::RuntimeVersion; -use primitives::{Blake2Hasher}; +use primitives::{Blake2Hasher, storage}; use subscriptions::Subscriptions; @@ -68,6 +68,16 @@ build_rpc_trait! { #[rpc(name = "chain_unsubscribeNewHead", alias = ["unsubscribe_newHead", ])] fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult; } + + #[pubsub(name = "chain_runtimeVersion")] { + /// New runtime version subscription + #[rpc(name = "chain_subscribeRuntimeVersion")] + fn subscribe_runtime_version(&self, Self::Metadata, pubsub::Subscriber); + + /// Unsubscribe from runtime version subscription + #[rpc(name = "chain_unsubscribeRuntimeVersion")] + fn unsubscribe_runtime_version(&self, SubscriptionId) -> RpcResult; + } } } @@ -163,4 +173,53 @@ impl ChainApi, Block:: fn unsubscribe_new_head(&self, id: SubscriptionId) -> RpcResult { Ok(self.subscriptions.cancel(id)) } + + + fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber) { + let stream = match self.client.storage_changes_notification_stream(Some(&[storage::StorageKey(storage::well_known_keys::CODE.to_vec())])) { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(error::Error::from(err).into()); + return; + } + }; + + self.subscriptions.add(subscriber, |sink| { + let version = self.runtime_version(None.into()) + .map_err(Into::into); + + let client = self.client.clone(); + let mut previous_version = version.clone(); + + let stream = stream + .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) + .filter_map(move |_| { + let version = client.info().and_then(|info| { + client.runtime_version_at(&BlockId::hash(info.chain.best_hash)) + }) + .map_err(error::Error::from) + .map_err(Into::into); + if previous_version != version { + previous_version = version.clone(); + Some(version) + } else { + None + } + }); + + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all( + stream::iter_result(vec![Ok(version)]) + .chain(stream) + ) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }); + } + + + fn unsubscribe_runtime_version(&self, id: SubscriptionId) -> RpcResult { + Ok(self.subscriptions.cancel(id)) + } }